http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h new file mode 100644 index 0000000..cde4fe1 --- /dev/null +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -0,0 +1,476 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_PARQUET_COLUMN_READERS_H +#define IMPALA_PARQUET_COLUMN_READERS_H + +#include <boost/scoped_ptr.hpp> + +#include "exec/parquet/hdfs-parquet-scanner.h" +#include "exec/parquet/parquet-level-decoder.h" +#include "util/bit-stream-utils.h" +#include "util/codec.h" + +namespace impala { + +class DictDecoderBase; +class Tuple; +class MemPool; + +/// Base class for reading a Parquet column. Reads a logical column, not necessarily a +/// column materialized in the file (e.g. collections). The two subclasses are +/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep +/// level pair at a time. The current def and rep level are exposed to the user, and the +/// corresponding value (if defined) can optionally be copied into a slot via +/// ReadValue(). Can also write position slots. +/// +/// The constructor adds the object to the obj_pool of the parent HdfsParquetScanner. +class ParquetColumnReader { + public: + /// Creates a column reader for 'node' and associates it with the given parent scanner. + /// The constructor of column readers add the new object to the parent's object pool. + /// 'slot_desc' may be NULL, in which case the returned column reader can only be used + /// to read def/rep levels. + /// 'is_collection_field' should be set to true if the returned reader is reading a + /// collection. This cannot be determined purely by 'node' because a repeated scalar + /// node represents both an array and the array's items (in this case + /// 'is_collection_field' should be true if the reader reads one value per array, and + /// false if it reads one value per item). The reader is added to the runtime state's + /// object pool. Does not create child readers for collection readers; these must be + /// added by the caller. + /// + /// It supports the following primitive type widening that does not have any loss of + /// precision. + /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), double (DOUBLE) + /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE) + /// - int (INT32) -> bigint (INT64), double (DOUBLE) + /// - float (FLOAT) -> double (DOUBLE) + static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field, + const SlotDescriptor* slot_desc, HdfsParquetScanner* parent); + + static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& node, + const SlotDescriptor* slot_desc, HdfsParquetScanner* parent); + + virtual ~ParquetColumnReader() { } + + int def_level() const { return def_level_; } + int rep_level() const { return rep_level_; } + + const SlotDescriptor* slot_desc() const { return slot_desc_; } + const parquet::SchemaElement& schema_element() const { return *node_.element; } + int16_t max_def_level() const { return max_def_level_; } + int16_t max_rep_level() const { return max_rep_level_; } + int def_level_of_immediate_repeated_ancestor() const { + return node_.def_level_of_immediate_repeated_ancestor; + } + const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; } + void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) { + DCHECK(pos_slot_desc_ == NULL); + pos_slot_desc_ = pos_slot_desc; + } + + /// Returns true if this reader materializes collections (i.e. CollectionValues). + virtual bool IsCollectionReader() const = 0; + + const char* filename() const { return parent_->filename(); } + + /// Read the current value (or null) into 'tuple' for this column. This should only be + /// called when a value is defined, i.e., def_level() >= + /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce + /// no output values), otherwise NextLevels() should be called instead. + /// + /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be + /// called after calling ReadValue()). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + /// + /// NextLevels() must be called on this reader before calling ReadValue() for the first + /// time. This is to initialize the current value that ReadValue() will read. + /// + /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue()) + /// The codegened functions from all the materialized cols will then be combined + /// into one function. + /// TODO: another option is to materialize col by col for the entire row batch in + /// one call. e.g. MaterializeCol would write out 1024 values. Our row batches + /// are currently dense so we'll need to figure out something there. + virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0; + + /// Same as ReadValue() but does not advance repetition level. Only valid for columns + /// not in collections. + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0; + + /// Batched version of ReadValue() that reads up to max_values at once and materializes + /// them into tuples in tuple_mem. Returns the number of values actually materialized + /// in *num_values. The return value, error behavior and state changes are generally + /// the same as in ReadValue(). For example, if an error occurs in the middle of + /// materializing a batch then false is returned, and num_values, tuple_mem, as well as + /// this column reader are left in an undefined state, assuming that the caller will + /// immediately abort execution. NextLevels() does *not* need to be called before + /// ReadValueBatch(), unlike ReadValue(). + virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) = 0; + + /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and + /// materializes them into tuples in tuple_mem. + /// The return value and error behavior are the same as in ReadValueBatch(). + virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) = 0; + + /// Advances this column reader's def and rep levels to the next logical value, i.e. to + /// the next scalar value or the beginning of the next collection, without attempting to + /// read the value. This is used to skip past def/rep levels that don't materialize a + /// value, such as the def/rep levels corresponding to an empty containing collection. + /// + /// NextLevels() must be called on this reader before calling ReadValue() for the first + /// time. This is to initialize the current value that ReadValue() will read. + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + virtual bool NextLevels() = 0; + + /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the + /// parent collection) to 'pos' and increments pos_current_value_. Only valid to + /// call when doing non-batched reading, i.e. NextLevels() must have been called + /// before each call to this function to advance to the next element in the + /// collection. + inline void ReadPositionNonBatched(int64_t* pos); + + /// Returns true if this column reader has reached the end of the row group. + inline bool RowGroupAtEnd() { + DCHECK_EQ(rep_level_ == ParquetLevel::ROW_GROUP_END, + def_level_ == ParquetLevel::ROW_GROUP_END); + return rep_level_ == ParquetLevel::ROW_GROUP_END; + } + + /// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it, + /// and frees up other resources. If 'row_batch' is NULL frees all resources instead. + virtual void Close(RowBatch* row_batch) = 0; + + protected: + HdfsParquetScanner* parent_; + const SchemaNode& node_; + const SlotDescriptor* const slot_desc_; + + /// The slot descriptor for the position field of the tuple, if there is one. NULL if + /// there's not. Only one column reader for a given tuple desc will have this set. + const SlotDescriptor* pos_slot_desc_; + + /// The next value to write into the position slot, if there is one. 64-bit int because + /// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when this column + /// reader does not have a current rep and def level (i.e. before the first NextLevels() + /// call or after the last value in the column has been read). + int64_t pos_current_value_; + + /// The current repetition and definition levels of this reader. Advanced via + /// ReadValue() and NextLevels(). Set to ParquetLevel:: INVALID_LEVEL before the first + /// NextLevels() call for a row group or if an error is encountered decoding a level. + /// Set to ROW_GROUP_END after the last value in the column has been read). If this is + /// not inside a collection, rep_level_ is always 0, ParquetLevel::INVALID_LEVEL or + /// ParquetLevel::ROW_GROUP_END. + /// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values + /// ParquetLevel::INVALID_LEVEL and ParquetLevel::ROW_GROUP_END. The maximum values are + /// cached here because they are accessed in inner loops. + int16_t rep_level_; + const int16_t max_rep_level_; + int16_t def_level_; + const int16_t max_def_level_; + + // Cache frequently accessed members of slot_desc_ for perf. + + /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL. + const int tuple_offset_; + + /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL. + const NullIndicatorOffset null_indicator_offset_; + + ParquetColumnReader( + HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) + : parent_(parent), + node_(node), + slot_desc_(slot_desc), + pos_slot_desc_(NULL), + pos_current_value_(ParquetLevel::INVALID_POS), + rep_level_(ParquetLevel::INVALID_LEVEL), + max_rep_level_(node_.max_rep_level), + def_level_(ParquetLevel::INVALID_LEVEL), + max_def_level_(node_.max_def_level), + tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()), + null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() : + slot_desc->null_indicator_offset()) { + DCHECK(parent != nullptr); + parent->obj_pool_.Add(this); + + DCHECK_GE(node_.max_rep_level, 0); + DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max()); + DCHECK_GE(node_.max_def_level, 0); + DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max()); + // rep_level_ is always valid and equal to 0 if col not in collection. + if (max_rep_level() == 0) rep_level_ = 0; + } + + /// Called in the middle of creating a scratch tuple batch to simulate failures + /// such as exceeding memory limit or cancellation. Returns false if the debug + /// action deems that the parquet column reader should halt execution. 'val_count' + /// is the counter which the column reader uses to track the number of tuples + /// produced so far. If the column reader should halt execution, 'parse_status_' + /// is updated with the error status and 'val_count' is set to 0. + inline bool ColReaderDebugAction(int* val_count); +}; + +/// Reader for a single column from the parquet file. It's associated with a +/// ScannerContext::Stream and is responsible for decoding the data. Super class for +/// per-type column readers. This contains most of the logic, the type specific functions +/// must be implemented in the subclass. +class BaseScalarColumnReader : public ParquetColumnReader { + public: + BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : ParquetColumnReader(parent, node, slot_desc), + data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) { + DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); + } + + virtual ~BaseScalarColumnReader() { } + + virtual bool IsCollectionReader() const { return false; } + + /// Resets the reader for each row group in the file and creates the scan + /// range for the column, but does not start it. To start scanning, + /// set_io_reservation() must be called to assign reservation to this + /// column, followed by StartScan(). + Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, + int row_group_idx); + + /// Starts the column scan range. The reader must be Reset() and have a + /// reservation assigned via set_io_reservation(). This must be called + /// before any of the column data can be read (including dictionary and + /// data pages). Returns an error status if there was an error starting the + /// scan or allocating buffers for it. + Status StartScan(); + + /// Helper to start scans for multiple columns at once. + static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) { + for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan()); + return Status::OK(); + } + + virtual void Close(RowBatch* row_batch); + + io::ScanRange* scan_range() const { return scan_range_; } + int64_t total_len() const { return metadata_->total_compressed_size; } + int col_idx() const { return node_.col_idx; } + THdfsCompression::type codec() const { + if (metadata_ == NULL) return THdfsCompression::NONE; + return ConvertParquetToImpalaCodec(metadata_->codec); + } + void set_io_reservation(int bytes) { io_reservation_ = bytes; } + + /// Reads the next definition and repetition levels for this column. Initializes the + /// next data page if necessary. + virtual bool NextLevels() { return NextLevels<true>(); } + + /// Check the data stream to see if there is a dictionary page. If there is, + /// use that page to initialize dict_decoder_ and advance the data stream + /// past the dictionary page. + Status InitDictionary(); + + /// Convenience function to initialize multiple dictionaries. + static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers); + + // Returns the dictionary or NULL if the dictionary doesn't exist + virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; } + + // Returns whether the datatype for this column requires conversion from the on-disk + // format for correctness. For example, timestamps can require an offset to be + // applied. + virtual bool NeedsConversion() { return false; } + + // Returns whether the datatype for this column requires validation. For example, + // the timestamp format has certain bit combinations that are invalid, and these + // need to be validated when read from disk. + virtual bool NeedsValidation() { return false; } + + // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if + // we know this row can be skipped. This could be very useful with stats and big + // sections can be skipped. Implement that when we can benefit from it. + + protected: + // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup()) + friend class HdfsParquetScanner; + + // Class members that are accessed for every column should be included up here so they + // fit in as few cache lines as possible. + + /// Pointer to start of next value in data page + uint8_t* data_ = nullptr; + + /// End of the data page. + const uint8_t* data_end_ = nullptr; + + /// Decoder for definition levels. + ParquetLevelDecoder def_levels_{true}; + + /// Decoder for repetition levels. + ParquetLevelDecoder rep_levels_{false}; + + /// Page encoding for values of the current data page. Cached here for perf. Set in + /// InitDataPage(). + parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; + + /// Num values remaining in the current data page + int num_buffered_values_ = 0; + + // Less frequently used members that are not accessed in inner loop should go below + // here so they do not occupy precious cache line space. + + /// The number of values seen so far. Updated per data page. + int64_t num_values_read_ = 0; + + /// Metadata for the column for the current row group. + const parquet::ColumnMetaData* metadata_ = nullptr; + + boost::scoped_ptr<Codec> decompressor_; + + /// The scan range for the column's data. Initialized for each row group by Reset(). + io::ScanRange* scan_range_ = nullptr; + + // Stream used to read data from 'scan_range_'. Initialized by StartScan(). + ScannerContext::Stream* stream_ = nullptr; + + /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set + /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group + /// by Reset(). + int64_t io_reservation_ = 0; + + /// Pool to allocate storage for data pages from - either decompression buffers for + /// compressed data pages or copies of the data page with var-len data to attach to + /// batches. + boost::scoped_ptr<MemPool> data_page_pool_; + + /// Header for current data page. + parquet::PageHeader current_page_header_; + + /// Reads the next page header into next_page_header/next_header_size. + /// If the stream reaches the end before reading a complete page header, + /// eos is set to true. If peek is false, the stream position is advanced + /// past the page header. If peek is true, the stream position is not moved. + /// Returns an error status if the next page header could not be read. + Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header, + uint32_t* next_header_size, bool* eos); + + /// Read the next data page. If a dictionary page is encountered, that will be read and + /// this function will continue reading the next data page. + Status ReadDataPage(); + + /// Try to move the the next page and buffer more values. Return false and + /// sets rep_level_, def_level_ and pos_current_value_ to -1 if no more pages or an + /// error encountered. + bool NextPage(); + + /// Implementation for NextLevels(). + template <bool ADVANCE_REP_LEVEL> + bool NextLevels(); + + /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a + /// dictionary decoder stored in this object. Subclass must implement this. Returns + /// an error status if the dictionary values could not be decoded successfully. + virtual Status CreateDictionaryDecoder(uint8_t* values, int size, + DictDecoderBase** decoder) = 0; + + /// Return true if the column has a dictionary decoder. Subclass must implement this. + virtual bool HasDictionaryDecoder() = 0; + + /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass + /// must implement this. + virtual void ClearDictionaryDecoder() = 0; + + /// Initializes the reader with the data contents. This is the content for the entire + /// decompressed data page. Decoders can initialize state from here. The caller must + /// validate the input such that 'size' is non-negative and that 'data' has at least + /// 'size' bytes remaining. + virtual Status InitDataPage(uint8_t* data, int size) = 0; + + /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from + /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success, + /// 'buffer' points to the allocated memory. Otherwise an error status is returned. + Status AllocateUncompressedDataPage( + int64_t size, const char* err_ctx, uint8_t** buffer); + + /// Returns true if a data page for this column with the specified 'encoding' may + /// contain strings referenced by returned batches. Cases where this is not true are: + /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'. + /// * Fixed-length slots, where there is no string data. + bool PageContainsTupleData(parquet::Encoding::type page_encoding) { + return page_encoding != parquet::Encoding::PLAIN_DICTIONARY + && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType(); + } + + /// Slow-path status construction code for def/rep decoding errors. 'level_name' is + /// either "rep" or "def", 'decoded_level' is the value returned from + /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed value. + void __attribute__((noinline)) SetLevelDecodeError(const char* level_name, + int decoded_level, int max_level); + + // Returns a detailed error message about unsupported encoding. + Status GetUnsupportedDecodingError(); +}; + +// Inline to allow inlining into collection and scalar column reader. +inline void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) { + // NextLevels() should have already been called + DCHECK_GE(rep_level_, 0); + DCHECK_GE(def_level_, 0); + DCHECK_GE(pos_current_value_, 0); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << + "Caller should have called NextLevels() until we are ready to read a value"; + *pos = pos_current_value_++; +} + +// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling +// path doesn't falsely report that the file is corrupted. +// Inlined to avoid overhead in release builds. +inline bool ParquetColumnReader::ColReaderDebugAction(int* val_count) { +#ifndef NDEBUG + Status status = parent_->ScannerDebugAction(); + if (!status.ok()) { + if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status); + *val_count = 0; + return false; + } +#endif + return true; +} + +// Trigger debug action on every other call of Read*ValueBatch() once at least 128 +// tuples have been produced to simulate failure such as exceeding memory limit. +// Triggering it every other call so as not to always fail on the first column reader +// when materializing multiple columns. Failing on non-empty row batch tests proper +// resources freeing by the Parquet scanner. +#ifndef NDEBUG +extern int parquet_column_reader_debug_count; +#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(num_tuples) \ + ((parquet_column_reader_debug_count++ % 2) == 1 && num_tuples >= 128) +#else +#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(x) (false) +#endif + +} // namespace impala + +#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-column-stats.cc b/be/src/exec/parquet/parquet-column-stats.cc new file mode 100644 index 0000000..478bba4 --- /dev/null +++ b/be/src/exec/parquet/parquet-column-stats.cc @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet-column-stats.inline.h" + +#include <algorithm> +#include <cmath> +#include <limits> + +#include "common/names.h" + +namespace impala { + +bool ColumnStatsReader::ReadFromThrift(StatsField stats_field, void* slot) const { + if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) { + return false; + } + const parquet::Statistics& stats = col_chunk_.meta_data.statistics; + + // Try to read the requested stats field. If it is not set, we may fall back to reading + // the old stats, based on the column type. + const string* stat_value = nullptr; + switch (stats_field) { + case StatsField::MIN: + if (stats.__isset.min_value && CanUseStats()) { + stat_value = &stats.min_value; + break; + } + if (stats.__isset.min && CanUseDeprecatedStats()) { + stat_value = &stats.min; + } + break; + case StatsField::MAX: + if (stats.__isset.max_value && CanUseStats()) { + stat_value = &stats.max_value; + break; + } + if (stats.__isset.max && CanUseDeprecatedStats()) { + stat_value = &stats.max; + } + break; + default: + DCHECK(false) << "Unsupported statistics field requested"; + } + if (stat_value == nullptr) return false; + + switch (col_type_.type) { + case TYPE_BOOLEAN: + return ColumnStats<bool>::DecodePlainValue(*stat_value, slot, + parquet::Type::BOOLEAN); + case TYPE_TINYINT: { + // parquet::Statistics encodes INT_8 values using 4 bytes. + int32_t col_stats; + bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats, + parquet::Type::INT32); + if (!ret || col_stats < std::numeric_limits<int8_t>::min() || + col_stats > std::numeric_limits<int8_t>::max()) { + return false; + } + *static_cast<int8_t*>(slot) = col_stats; + return true; + } + case TYPE_SMALLINT: { + // parquet::Statistics encodes INT_16 values using 4 bytes. + int32_t col_stats; + bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats, + parquet::Type::INT32); + if (!ret || col_stats < std::numeric_limits<int16_t>::min() || + col_stats > std::numeric_limits<int16_t>::max()) { + return false; + } + *static_cast<int16_t*>(slot) = col_stats; + return true; + } + case TYPE_INT: + return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot, element_.type); + case TYPE_BIGINT: + return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot, element_.type); + case TYPE_FLOAT: + // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN + return ColumnStats<float>::DecodePlainValue(*stat_value, slot, element_.type) + && !std::isnan(*reinterpret_cast<float*>(slot)); + case TYPE_DOUBLE: + // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN + return ColumnStats<double>::DecodePlainValue(*stat_value, slot, element_.type) + && !std::isnan(*reinterpret_cast<double*>(slot)); + case TYPE_TIMESTAMP: + return DecodeTimestamp(*stat_value, stats_field, + static_cast<TimestampValue*>(slot)); + case TYPE_STRING: + case TYPE_VARCHAR: + return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot, element_.type); + case TYPE_CHAR: + /// We don't read statistics for CHAR columns, since CHAR support is broken in + /// Impala (IMPALA-1652). + return false; + case TYPE_DECIMAL: + switch (col_type_.GetByteSize()) { + case 4: + return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot, + element_.type); + case 8: + return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot, + element_.type); + case 16: + return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot, + element_.type); + } + DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize(); + default: + DCHECK(false) << col_type_.DebugString(); + } + return false; +} + +bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value, + ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const { + bool stats_read = false; + if (element_.type == parquet::Type::INT96) { + stats_read = + ColumnStats<TimestampValue>::DecodePlainValue(stat_value, slot, element_.type); + } else if (element_.type == parquet::Type::INT64) { + int64_t tmp; + stats_read = ColumnStats<int64_t>::DecodePlainValue(stat_value, &tmp, element_.type); + if (stats_read) *slot = timestamp_decoder_.Int64ToTimestampValue(tmp); + } else { + DCHECK(false) << element_.name; + return false; + } + + if (stats_read && timestamp_decoder_.NeedsConversion()) { + if (stats_field == ColumnStatsReader::StatsField::MIN) { + timestamp_decoder_.ConvertMinStatToLocalTime(slot); + } else { + timestamp_decoder_.ConvertMaxStatToLocalTime(slot); + } + } + return stats_read && slot->HasDateAndTime(); +} + +bool ColumnStatsReader::ReadNullCountStat(int64_t* null_count) const { + if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) { + return false; + } + const parquet::Statistics& stats = col_chunk_.meta_data.statistics; + if (stats.__isset.null_count) { + *null_count = stats.null_count; + return true; + } + return false; +} + +Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) { + if (value->ptr == buffer->buffer()) return Status::OK(); + buffer->Clear(); + RETURN_IF_ERROR(buffer->Append(value->ptr, value->len)); + value->ptr = buffer->buffer(); + return Status::OK(); +} + +bool ColumnStatsReader::CanUseStats() const { + // If column order is not set, only statistics for numeric types can be trusted. + if (col_order_ == nullptr) { + return col_type_.IsBooleanType() || col_type_.IsIntegerType() + || col_type_.IsFloatingPointType(); + } + // Stats can be used if the column order is TypeDefinedOrder (see parquet.thrift). + return col_order_->__isset.TYPE_ORDER; +} + +bool ColumnStatsReader::CanUseDeprecatedStats() const { + // If column order is set to something other than TypeDefinedOrder, we shall not use the + // stats (see parquet.thrift). + if (col_order_ != nullptr && !col_order_->__isset.TYPE_ORDER) return false; + return col_type_.IsBooleanType() || col_type_.IsIntegerType() + || col_type_.IsFloatingPointType(); +} + +} // end ns impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-column-stats.h b/be/src/exec/parquet/parquet-column-stats.h new file mode 100644 index 0000000..c259c7a --- /dev/null +++ b/be/src/exec/parquet/parquet-column-stats.h @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H +#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H + +#include <string> +#include <type_traits> + +#include "exec/parquet/parquet-common.h" +#include "runtime/decimal-value.h" +#include "runtime/string-buffer.h" +#include "runtime/timestamp-value.h" +#include "runtime/types.h" + +#include "gen-cpp/parquet_types.h" + +namespace impala { + +/// This class, together with its derivatives, is used to update column statistics when +/// writing parquet files. It provides an interface to populate a parquet::Statistics +/// object and attach it to an object supplied by the caller. It can also be used to +/// decode parquet::Statistics into slots. +/// +/// We currently support writing the 'min_value' and 'max_value' fields in +/// parquet::Statistics. The other two statistical values - 'null_count' and +/// 'distinct_count' - are not tracked or populated. We do not populate the deprecated +/// 'min' and 'max' fields. +/// +/// Regarding the ordering of values, we follow the parquet-format specification for +/// logical types (LogicalTypes.md in parquet-format): +/// +/// - Numeric values (BOOLEAN, INT, FLOAT, DOUBLE, DECIMAL) are ordered by their numeric +/// value (as opposed to their binary representation). +/// +/// - Strings are ordered using bytewise, unsigned comparison. +/// +/// - Timestamps are compared by numerically comparing the points in time they represent. +/// +/// NULL values are not considered for min/max statistics, and if a column consists only +/// of NULL values, then no min/max statistics are written. +/// +/// Updating the statistics is handled in derived classes to alleviate the need for +/// virtual function calls. +/// +/// TODO: Populate null_count and distinct_count. +class ColumnStatsBase { + public: + /// min and max functions for types that are not floating point numbers + template <typename T, typename Enable = void> + struct MinMaxTrait { + static decltype(auto) MinValue(const T& a, const T& b) { return std::min(a, b); } + static decltype(auto) MaxValue(const T& a, const T& b) { return std::max(a, b); } + static int Compare(const T& a, const T& b) { + if (a < b) return -1; + if (a > b) return 1; + return 0; + } + }; + + /// min and max functions for floating point types + template <typename T> + struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> { + static decltype(auto) MinValue(const T& a, const T& b) { return std::fmin(a, b); } + static decltype(auto) MaxValue(const T& a, const T& b) { return std::fmax(a, b); } + static int Compare(const T& a, const T& b) { + //TODO: Should be aligned with PARQUET-1222, once resolved + if (a == b) return 0; + if (std::isnan(a) && std::isnan(b)) return 0; + if (MaxValue(a, b) == a) return 1; + return -1; + } + }; + + ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {} + virtual ~ColumnStatsBase() {} + + /// Merges this statistics object with values from 'other'. If other has not been + /// initialized, then this object will not be changed. It maintains internal state that + /// tracks whether the min/max values are ordered. + virtual void Merge(const ColumnStatsBase& other) = 0; + + /// Copies the contents of this object's statistics values to internal buffers. Some + /// data types (e.g. StringValue) need to be copied at the end of processing a row + /// batch, since the batch memory will be released. Overwrite this method in derived + /// classes to provide the functionality. + virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT { + return Status::OK(); + } + + /// Returns the number of bytes needed to encode the current statistics into a + /// parquet::Statistics object. + virtual int64_t BytesNeeded() const = 0; + + /// Encodes the current values into a Statistics thrift message. + virtual void EncodeToThrift(parquet::Statistics* out) const = 0; + + /// Resets the state of this object. + void Reset(); + + /// Update the statistics by incrementing the null_count. It is called each time a null + /// value is appended to the column or the statistics are merged. + void IncrementNullCount(int64_t count) { null_count_ += count; } + + /// Returns the boundary order of the pages. That is, whether the lists of min/max + /// elements inside the ColumnIndex are ordered and if so, in which direction. + /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is true, + /// it means all elements are equal, we choose ascending order in this case. + /// If only one flag is true, or both of them is false, then we return the identified + /// ordering, or unordered. + parquet::BoundaryOrder::type GetBoundaryOrder() const { + if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING; + if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING; + return parquet::BoundaryOrder::UNORDERED; + } + + protected: + // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'. + // 'buffer' is reset before making the copy. + static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT; + + /// Stores whether the min and max values of the current object have been initialized. + bool has_min_max_values_; + + // Number of null values since the last call to Reset(). + int64_t null_count_; + + // If true, min/max values are ascending. + // We assume the values are ascending, so start with true and only make it false when + // we find a descending value. If not all values are equal, then at least one of + // 'ascending_boundary_order_' and 'descending_boundary_order_' will be false. + bool ascending_boundary_order_ = true; + + // If true, min/max values are descending. + // See description of 'ascending_boundary_order_'. + bool descending_boundary_order_ = true; +}; + +/// This class contains behavior specific to our in-memory formats for different types. +template <typename T> +class ColumnStats : public ColumnStatsBase { + friend class ColumnStatsBase; + // We explicitly require types to be listed here in order to support column statistics. + // When adding a type here, users of this class need to ensure that the statistics + // follow the ordering semantics of parquet's min/max statistics for the new type. + // Details on how the values should be ordered can be found in the 'parquet-format' + // project in 'parquet.thrift' and 'LogicalTypes.md'. + using value_type = typename std::enable_if< + std::is_arithmetic<T>::value + || std::is_same<bool, T>::value + || std::is_same<StringValue, T>::value + || std::is_same<TimestampValue, T>::value + || std::is_same<Decimal4Value, T>::value + || std::is_same<Decimal8Value, T>::value + || std::is_same<Decimal16Value, T>::value, + T>::type; + + public: + /// 'mem_pool' is used to materialize string values so that the user of this class can + /// free the memory of the original values. + /// 'plain_encoded_value_size' specifies the size of each encoded value in plain + /// encoding, -1 if the type is variable-length. + ColumnStats(MemPool* mem_pool, int plain_encoded_value_size) + : ColumnStatsBase(), + plain_encoded_value_size_(plain_encoded_value_size), + mem_pool_(mem_pool), + min_buffer_(mem_pool), + max_buffer_(mem_pool), + prev_page_min_buffer_(mem_pool), + prev_page_max_buffer_(mem_pool) {} + + /// Updates the statistics based on the values min_value and max_value. If necessary, + /// initializes the statistics. It may keep a reference to either value until + /// MaterializeStringValuesToInternalBuffers() gets called. + void Update(const T& min_value, const T& max_value); + + /// Wrapper to call the Update function which takes in the min_value and max_value. + void Update(const T& v) { Update(v, v); } + + virtual void Merge(const ColumnStatsBase& other) override; + virtual Status MaterializeStringValuesToInternalBuffers() override { + return Status::OK(); + } + + virtual int64_t BytesNeeded() const override; + virtual void EncodeToThrift(parquet::Statistics* out) const override; + + /// Decodes the plain encoded stats value from 'buffer' and writes the result into the + /// buffer pointed to by 'slot'. Returns true if decoding was successful, false + /// otherwise. For timestamps, an additional validation will be performed. + static bool DecodePlainValue(const std::string& buffer, void* slot, + parquet::Type::type parquet_type); + + protected: + /// Encodes a single value using parquet's plain encoding and stores it into the binary + /// string 'out'. String values are stored without additional encoding. 'bytes_needed' + /// must be positive. + static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out); + + /// Returns the number of bytes needed to encode value 'v'. + int64_t BytesNeeded(const T& v) const; + + // Size of each encoded value in plain encoding, -1 if the type is variable-length. + int plain_encoded_value_size_; + + // Minimum value since the last call to Reset(). + T min_value_; + + // Maximum value since the last call to Reset(). + T max_value_; + + // Minimum value of the previous page. Need to store that to calculate boundary order. + T prev_page_min_value_; + + // Maximum value of the previous page. Need to store that to calculate boundary order. + T prev_page_max_value_; + + // Memory pool to allocate from when making copies of the statistics data. + MemPool* mem_pool_; + + // Local buffers to copy statistics data into. + StringBuffer min_buffer_; + StringBuffer max_buffer_; + StringBuffer prev_page_min_buffer_; + StringBuffer prev_page_max_buffer_; +}; + +/// Class that handles the decoding of Parquet stats (min/max/null_count) for a given +/// column chunk. +class ColumnStatsReader { +public: + /// Enum to select whether to read minimum or maximum statistics. Values do not + /// correspond to fields in parquet::Statistics, but instead select between retrieving + /// the minimum or maximum value. + enum class StatsField { MIN, MAX }; + + ColumnStatsReader(const parquet::ColumnChunk& col_chunk, const ColumnType& col_type, + const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element) + : col_chunk_(col_chunk), + col_type_(col_type), + col_order_(col_order), + element_(element) {} + + /// Sets extra information that is only needed for decoding TIMESTAMP stats. + void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) { + timestamp_decoder_ = timestamp_decoder; + } + + /// Decodes the parquet::Statistics from 'col_chunk_' and writes the value selected by + /// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type_'. Returns + /// true if reading statistics for columns of type 'col_type_' is supported and decoding + /// was successful, false otherwise. + bool ReadFromThrift(StatsField stats_field, void* slot) const; + + // Gets the null_count statistics from the column chunk's metadata and returns + // it via an output parameter. + // Returns true if the null_count stats were read successfully, false otherwise. + bool ReadNullCountStat(int64_t* null_count) const; + +private: + /// Returns true if we support reading statistics stored in the fields 'min_value' and + /// 'max_value' in parquet::Statistics for the type 'col_type_' and the column order + /// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only primitive + /// numeric types are supported. + bool CanUseStats() const; + + /// Returns true if we consider statistics stored in the deprecated fields 'min' and + /// 'max' in parquet::Statistics to be correct for the type 'col_type_' and the column + /// order 'col_order_'. Otherwise, returns false. + bool CanUseDeprecatedStats() const; + + /// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if + /// necessary. Returns true if the decoding and conversions were successful. + bool DecodeTimestamp(const std::string& stat_value, + ColumnStatsReader::StatsField stats_field, + TimestampValue* slot) const; + + const parquet::ColumnChunk& col_chunk_; + const ColumnType& col_type_; + const parquet::ColumnOrder* col_order_; + const parquet::SchemaElement& element_; + ParquetTimestampDecoder timestamp_decoder_; +}; +} // end ns impala +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-column-stats.inline.h b/be/src/exec/parquet/parquet-column-stats.inline.h new file mode 100644 index 0000000..933b3ae --- /dev/null +++ b/be/src/exec/parquet/parquet-column-stats.inline.h @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H +#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H + +#include "exec/parquet/parquet-common.h" +#include "gen-cpp/parquet_types.h" +#include "parquet-column-stats.h" +#include "runtime/string-value.inline.h" + +namespace impala { + +inline void ColumnStatsBase::Reset() { + has_min_max_values_ = false; + null_count_ = 0; + ascending_boundary_order_ = true; + descending_boundary_order_ = true; +} + +template <typename T> +inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) { + if (!has_min_max_values_) { + has_min_max_values_ = true; + min_value_ = min_value; + max_value_ = max_value; + } else { + min_value_ = MinMaxTrait<T>::MinValue(min_value_, min_value); + max_value_ = MinMaxTrait<T>::MaxValue(max_value_, max_value); + } +} + +template <typename T> +inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) { + DCHECK(dynamic_cast<const ColumnStats<T>*>(&other)); + const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other); + if (cs->has_min_max_values_) { + if (has_min_max_values_) { + if (ascending_boundary_order_) { + if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 || + MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) { + ascending_boundary_order_ = false; + } + } + if (descending_boundary_order_) { + if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 || + MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) { + descending_boundary_order_ = false; + } + } + } + Update(cs->min_value_, cs->max_value_); + prev_page_min_value_ = cs->min_value_; + prev_page_max_value_ = cs->max_value_; + } + IncrementNullCount(cs->null_count_); +} + +template <typename T> +inline int64_t ColumnStats<T>::BytesNeeded() const { + return BytesNeeded(min_value_) + BytesNeeded(max_value_) + + ParquetPlainEncoder::ByteSize(null_count_); +} + +template <typename T> +inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const { + if (has_min_max_values_) { + std::string min_str; + EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str); + out->__set_min_value(move(min_str)); + std::string max_str; + EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str); + out->__set_max_value(move(max_str)); + } + out->__set_null_count(null_count_); +} + +template <typename T> +inline void ColumnStats<T>::EncodePlainValue( + const T& v, int64_t bytes_needed, std::string* out) { + DCHECK_GT(bytes_needed, 0); + out->resize(bytes_needed); + const int64_t bytes_written = ParquetPlainEncoder::Encode( + v, bytes_needed, reinterpret_cast<uint8_t*>(&(*out)[0])); + DCHECK_EQ(bytes_needed, bytes_written); +} + +template <typename T> +inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot, + parquet::Type::type parquet_type) { + T* result = reinterpret_cast<T*>(slot); + int size = buffer.size(); + const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data()); + if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, result, + parquet_type) == -1) { + return false; + } + return true; +} + +template <typename T> +inline int64_t ColumnStats<T>::BytesNeeded(const T& v) const { + return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) : + plain_encoded_value_size_; +} + +/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus +/// needs special handling here. +template <> +inline void ColumnStats<bool>::EncodePlainValue( + const bool& v, int64_t bytes_needed, std::string* out) { + char c = v; + out->assign(1, c); +} + +template <> +inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot, + parquet::Type::type parquet_type) { + bool* result = reinterpret_cast<bool*>(slot); + DCHECK(buffer.size() == 1); + *result = (buffer[0] != 0); + return true; +} + +template <> +inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const { + return 1; +} + +/// Timestamp values need validation. +template <> +inline bool ColumnStats<TimestampValue>::DecodePlainValue( + const std::string& buffer, void* slot, parquet::Type::type parquet_type) { + TimestampValue* result = reinterpret_cast<TimestampValue*>(slot); + int size = buffer.size(); + const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data()); + if (parquet_type == parquet::Type::INT96) { + if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data, + data + size, size, result) == -1) { + return false; + } + } else { + DCHECK(false); + return false; + } + + // We don't need to convert the value here, because it is done by the caller. + // If this function were not static, then it would be possible to store the information + // needed for timezone conversion in the object and do the conversion here. + return TimestampValue::IsValidDate(result->date()); +} + +/// parquet::Statistics stores string values directly and does not use plain encoding. +template <> +inline void ColumnStats<StringValue>::EncodePlainValue( + const StringValue& v, int64_t bytes_needed, string* out) { + out->assign(v.ptr, v.len); +} + +template <> +inline bool ColumnStats<StringValue>::DecodePlainValue( + const std::string& buffer, void* slot, parquet::Type::type parquet_type) { + StringValue* result = reinterpret_cast<StringValue*>(slot); + result->ptr = const_cast<char*>(buffer.data()); + result->len = buffer.size(); + return true; +} + +template <> +inline void ColumnStats<StringValue>::Update( + const StringValue& min_value, const StringValue& max_value) { + if (!has_min_max_values_) { + has_min_max_values_ = true; + min_value_ = min_value; + min_buffer_.Clear(); + max_value_ = max_value; + max_buffer_.Clear(); + } else { + if (min_value < min_value_) { + min_value_ = min_value; + min_buffer_.Clear(); + } + if (max_value > max_value_) { + max_value_ = max_value; + max_buffer_.Clear(); + } + } +} + +template <> +inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) { + DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other)); + const ColumnStats<StringValue>* cs = static_cast< + const ColumnStats<StringValue>*>(&other); + if (cs->has_min_max_values_) { + if (has_min_max_values_) { + // Make sure that we copied the previous page's min/max values to their own buffer. + DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr), + static_cast<void*>(cs->min_value_.ptr)); + DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr), + static_cast<void*>(cs->max_value_.ptr)); + if (ascending_boundary_order_) { + if (prev_page_max_value_ > cs->max_value_ || + prev_page_min_value_ > cs->min_value_) { + ascending_boundary_order_ = false; + } + } + if (descending_boundary_order_) { + if (prev_page_max_value_ < cs->max_value_ || + prev_page_min_value_ < cs->min_value_) { + descending_boundary_order_ = false; + } + } + } + Update(cs->min_value_, cs->max_value_); + prev_page_min_value_ = cs->min_value_; + prev_page_max_value_ = cs->max_value_; + prev_page_min_buffer_.Clear(); + prev_page_max_buffer_.Clear(); + } + IncrementNullCount(cs->null_count_); +} + +// StringValues need to be copied at the end of processing a row batch, since the batch +// memory will be released. +template <> +inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() { + if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_)); + if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_)); + if (prev_page_min_buffer_.IsEmpty()) { + RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, &prev_page_min_value_)); + } + if (prev_page_max_buffer_.IsEmpty()) { + RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, &prev_page_max_value_)); + } + return Status::OK(); +} + +} // end ns impala +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-common.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc new file mode 100644 index 0000000..439d351 --- /dev/null +++ b/be/src/exec/parquet/parquet-common.cc @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet/parquet-common.h" + +namespace impala { + +/// Mapping of impala's internal types to parquet storage types. This is indexed by +/// PrimitiveType enum +const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = { + parquet::Type::BOOLEAN, // Invalid + parquet::Type::BOOLEAN, // NULL type + parquet::Type::BOOLEAN, + parquet::Type::INT32, + parquet::Type::INT32, + parquet::Type::INT32, + parquet::Type::INT64, + parquet::Type::FLOAT, + parquet::Type::DOUBLE, + parquet::Type::INT96, // Timestamp + parquet::Type::BYTE_ARRAY, // String + parquet::Type::BYTE_ARRAY, // Date, NYI + parquet::Type::BYTE_ARRAY, // DateTime, NYI + parquet::Type::BYTE_ARRAY, // Binary NYI + parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal + parquet::Type::BYTE_ARRAY, // VARCHAR(N) + parquet::Type::BYTE_ARRAY, // CHAR(N) +}; + +const int INTERNAL_TO_PARQUET_TYPES_SIZE = + sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]); + +/// Mapping of Parquet codec enums to Impala enums +const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = { + THdfsCompression::NONE, + THdfsCompression::SNAPPY, + THdfsCompression::GZIP, + THdfsCompression::LZO +}; + +const int PARQUET_TO_IMPALA_CODEC_SIZE = + sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]); + +/// Mapping of Impala codec enums to Parquet enums +const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = { + parquet::CompressionCodec::UNCOMPRESSED, + parquet::CompressionCodec::SNAPPY, // DEFAULT + parquet::CompressionCodec::GZIP, // GZIP + parquet::CompressionCodec::GZIP, // DEFLATE + parquet::CompressionCodec::SNAPPY, + parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED + parquet::CompressionCodec::LZO, +}; + +const int IMPALA_TO_PARQUET_CODEC_SIZE = + sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]); + +parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) { + DCHECK_GE(type, 0); + DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE); + return INTERNAL_TO_PARQUET_TYPES[type]; +} + +THdfsCompression::type ConvertParquetToImpalaCodec( + parquet::CompressionCodec::type codec) { + DCHECK_GE(codec, 0); + DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE); + return PARQUET_TO_IMPALA_CODEC[codec]; +} + +parquet::CompressionCodec::type ConvertImpalaToParquetCodec( + THdfsCompression::type codec) { + DCHECK_GE(codec, 0); + DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE); + return IMPALA_TO_PARQUET_CODEC[codec]; +} + +ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& e, + const Timezone* timezone, bool convert_int96_timestamps) { + bool needs_conversion = false; + if (e.__isset.logicalType) { + DCHECK(e.logicalType.__isset.TIMESTAMP); + needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC; + precision_ = e.logicalType.TIMESTAMP.unit.__isset.MILLIS + ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO; + } else { + if (e.__isset.converted_type) { + // Timestamp with converted type but without logical type are/were never written + // by Impala, so it is assumed that the writer is Parquet-mr and that timezone + // conversion is needed. + needs_conversion = true; + precision_ = e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS + ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO; + } else { + // INT96 timestamps needs conversion depending on the writer. + needs_conversion = convert_int96_timestamps; + precision_ = ParquetTimestampDecoder::NANO; + } + } + if (needs_conversion) timezone_ = timezone; +} + +void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const { + DCHECK(timezone_ != nullptr); + if (!v->HasDateAndTime()) return; + TimestampValue repeated_period_start; + v->UtcToLocal(*timezone_, &repeated_period_start); + if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start; +} + +void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const { + DCHECK(timezone_ != nullptr); + if (!v->HasDateAndTime()) return; + TimestampValue repeated_period_end; + v->UtcToLocal(*timezone_, nullptr, &repeated_period_end); + if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end; +} +} http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-common.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h new file mode 100644 index 0000000..79baeda --- /dev/null +++ b/be/src/exec/parquet/parquet-common.h @@ -0,0 +1,565 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_EXEC_PARQUET_COMMON_H +#define IMPALA_EXEC_PARQUET_COMMON_H + +#include "common/compiler-util.h" +#include "gen-cpp/Descriptors_types.h" +#include "gen-cpp/parquet_types.h" +#include "runtime/decimal-value.h" +#include "runtime/string-value.h" +#include "runtime/timestamp-value.inline.h" +#include "util/bit-util.h" +#include "util/decimal-util.h" +#include "util/mem-util.h" + +/// This file contains common elements between the parquet Writer and Scanner. +namespace impala { + +const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; +const uint32_t PARQUET_CURRENT_VERSION = 1; + +/// Return the Parquet type corresponding to Impala's internal type. The caller must +/// validate that the type is valid, otherwise this will DCHECK. +parquet::Type::type ConvertInternalToParquetType(PrimitiveType type); + +/// Return the Impala compression type for the given Parquet codec. The caller must +/// validate that the codec is a supported one, otherwise this will DCHECK. +THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec); + +/// Return the Parquet code for the given Impala compression type. The caller must +/// validate that the codec is a supported one, otherwise this will DCHECK. +parquet::CompressionCodec::type ConvertImpalaToParquetCodec( + THdfsCompression::type codec); + +/// The plain encoding does not maintain any state so all these functions +/// are static helpers. +/// TODO: we are using templates to provide a generic interface (over the +/// types) to avoid performance penalties. This makes the code more complex +/// and should be removed when we have codegen support to inline virtual +/// calls. +class ParquetPlainEncoder { + public: + /// Returns the byte size of 'v' where InternalType is the datatype that Impala uses + /// internally to store tuple data. Used in some template function implementations to + /// determine the encoded byte size for fixed-length types. + template <typename InternalType> + static int ByteSize(const InternalType& v) { return sizeof(InternalType); } + + /// Returns the encoded size of values of type t. Returns -1 if it is variable + /// length. This can be different than the slot size of the types. + static int EncodedByteSize(const ColumnType& t) { + switch (t.type) { + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + // CHAR is varlen here because we don't write the padding to the file + return -1; + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_FLOAT: + return 4; + case TYPE_BIGINT: + case TYPE_DOUBLE: + return 8; + case TYPE_TIMESTAMP: + return 12; + case TYPE_DECIMAL: + return DecimalSize(t); + case TYPE_NULL: + case TYPE_BOOLEAN: // These types are not plain encoded. + default: + DCHECK(false); + return -1; + } + } + + /// The minimum byte size to store decimals of with precision t.precision. + static int DecimalSize(const ColumnType& t) { + DCHECK(t.type == TYPE_DECIMAL); + // Numbers in the comment is the max positive value that can be represented + // with those number of bits (max negative is -(X + 1)). + // TODO: use closed form for this? + switch (t.precision) { + case 1: case 2: + return 1; // 127 + case 3: case 4: + return 2; // 32,767 + case 5: case 6: + return 3; // 8,388,607 + case 7: case 8: case 9: + return 4; // 2,147,483,427 + case 10: case 11: + return 5; // 549,755,813,887 + case 12: case 13: case 14: + return 6; // 140,737,488,355,327 + case 15: case 16: + return 7; // 36,028,797,018,963,967 + case 17: case 18: + return 8; // 9,223,372,036,854,775,807 + case 19: case 20: case 21: + return 9; // 2,361,183,241,434,822,606,847 + case 22: case 23: + return 10; // 604,462,909,807,314,587,353,087 + case 24: case 25: case 26: + return 11; // 154,742,504,910,672,534,362,390,527 + case 27: case 28: + return 12; // 39,614,081,257,132,168,796,771,975,167 + case 29: case 30: case 31: + return 13; // 10,141,204,801,825,835,211,973,625,643,007 + case 32: case 33: + return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 + case 34: case 35: + return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 + case 36: case 37: case 38: + return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 + default: + DCHECK(false); + break; + } + return -1; + } + + /// Encodes t into buffer. Returns the number of bytes added. buffer must + /// be preallocated and big enough. Buffer need not be aligned. + /// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and + /// is the number of bytes the plain encoder should use. + template <typename InternalType> + static int Encode(const InternalType& t, int fixed_len_size, uint8_t* buffer) { + memcpy(buffer, &t, ByteSize(t)); + return ByteSize(t); + } + + template <typename InternalType> + static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* buffer_end, + int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) { + switch (parquet_type) { + case parquet::Type::BOOLEAN: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::BOOLEAN>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::INT32: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT32>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::INT64: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT64>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::INT96: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT96>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::FLOAT: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::FLOAT>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::DOUBLE: + return ParquetPlainEncoder::Decode<InternalType, parquet::Type::DOUBLE>(buffer, + buffer_end, fixed_len_size, v); + case parquet::Type::BYTE_ARRAY: + return ParquetPlainEncoder::Decode<InternalType, + parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v); + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + return ParquetPlainEncoder::Decode<InternalType, + parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v); + default: + DCHECK(false) << "Unexpected physical type"; + return -1; + } + } + + /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer' + /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size' + /// is the size of the object. Otherwise, it is unused. + /// Returns the number of bytes read or -1 if the value was not decoded successfully. + /// This generic template function is used with the following types: + /// ============================= + /// InternalType | PARQUET_TYPE + /// ============================= + /// int32_t | INT32 + /// int64_t | INT64 + /// float | FLOAT + /// double | DOUBLE + /// Decimal4Value | INT32 + /// Decimal8Value | INT64 + /// TimestampValue | INT96 + template <typename InternalType, parquet::Type::type PARQUET_TYPE> + static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, + InternalType* v) { + int byte_size = ByteSize(*v); + if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; + memcpy(v, buffer, byte_size); + return byte_size; + } + + /// Batched version of Decode() that tries to decode 'num_values' values from the memory + /// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes. + /// Returns the number of bytes read from 'buffer' or -1 if there was an error + /// decoding, e.g. invalid data or running out of input data before reading + /// 'num_values'. + template <typename InternalType, parquet::Type::type PARQUET_TYPE> + static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end, + int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v); +}; + +/// Calling this with arguments of type ColumnType is certainly a programmer error, so we +/// disallow it. +template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t); + +/// Disable for bools. Plain encoding is not used for booleans. +template <> int ParquetPlainEncoder::ByteSize(const bool& b); +template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, uint8_t*); +template <> int ParquetPlainEncoder::Decode<bool, parquet::Type::BOOLEAN>(const uint8_t*, + const uint8_t*, int fixed_len_size, bool* v); + +template <> +inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) { + // Only used when the decimal is stored as INT32. + return sizeof(Decimal4Value::StorageType); +} +template <> +inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) { + // Only used when the decimal is stored as INT64. + return sizeof(Decimal8Value::StorageType); +} +template <> +inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) { + // Not used, since such big decimals can only be stored as BYTE_ARRAY or + // FIXED_LEN_BYTE_ARRAY. + DCHECK(false); + return -1; +} + +/// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit. +template <> +inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return sizeof(int32_t); } +template <> +inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return sizeof(int32_t); } + +template <> +inline int ParquetPlainEncoder::ByteSize(const StringValue& v) { + return sizeof(int32_t) + v.len; +} + +template <> +inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) { + return 12; +} + +template <typename From, typename To> +inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* buffer_end, To* v) { + int byte_size = sizeof(From); + if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; + From dest; + memcpy(&dest, buffer, byte_size); + *v = dest; + return byte_size; +} + +template <> +inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) { + return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v); +} + +template <> +inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) { + return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v); +} + +template <> +inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) { + return DecodeWithConversion<float, double>(buffer, buffer_end, v); +} + +template <> +inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int8_t* v) { + int byte_size = ByteSize(*v); + if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; + *v = *buffer; + return byte_size; +} +template <> +inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int16_t* v) { + int byte_size = ByteSize(*v); + if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; + memcpy(v, buffer, sizeof(int16_t)); + return byte_size; +} + +template<typename T> +inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) { + int32_t val = v; + memcpy(buffer, &val, sizeof(int32_t)); + return ParquetPlainEncoder::ByteSize(v); +} + +template <> +inline int ParquetPlainEncoder::Encode( + const int8_t& v, int fixed_len_size, uint8_t* buffer) { + return EncodeToInt32(v, fixed_len_size, buffer); +} + +template <> +inline int ParquetPlainEncoder::Encode( + const int16_t& v, int fixed_len_size, uint8_t* buffer) { + return EncodeToInt32(v, fixed_len_size, buffer); +} + +template <> +inline int ParquetPlainEncoder::Encode( + const StringValue& v, int fixed_len_size, uint8_t* buffer) { + memcpy(buffer, &v.len, sizeof(int32_t)); + memcpy(buffer + sizeof(int32_t), v.ptr, v.len); + return ByteSize(v); +} + +template <> +inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, + StringValue* v) { + if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1; + memcpy(&v->len, buffer, sizeof(int32_t)); + int byte_size = ByteSize(*v); + if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1; + v->ptr = reinterpret_cast<char*>(const_cast<uint8_t*>(buffer)) + sizeof(int32_t); + if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size); + // we still read byte_size bytes, even if we truncate + return byte_size; +} + +/// Write decimals as big endian (byte comparable) to benefit from common prefixes. +/// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. This means +/// that the value in the in-memory format has leading zeros or negative 1's. +/// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value +/// will have 3 bytes of leading zeros, we will only store the interesting byte. +template<typename T> +inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) { + DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v); + return fixed_len_size; +} + +template <> +inline int ParquetPlainEncoder::Encode( + const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) { + return EncodeDecimal(v, fixed_len_size, buffer); +} + +template <> +inline int ParquetPlainEncoder::Encode( + const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) { + return EncodeDecimal(v, fixed_len_size, buffer); +} + +template <> +inline int ParquetPlainEncoder::Encode( + const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) { + return EncodeDecimal(v, fixed_len_size, buffer); +} + +template <typename InternalType, parquet::Type::type PARQUET_TYPE> +inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer, + const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride, + InternalType* v) { + const uint8_t* buffer_pos = buffer; + StrideWriter<InternalType> out(v, stride); + for (int64_t i = 0; i < num_values; ++i) { + int encoded_len = Decode<InternalType, PARQUET_TYPE>( + buffer_pos, buffer_end, fixed_len_size, out.Advance()); + if (UNLIKELY(encoded_len < 0)) return -1; + buffer_pos += encoded_len; + } + return buffer_pos - buffer; +} + +template <typename T> +inline int DecodeDecimalFixedLen( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* v) { + if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1; + DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v); + return fixed_len_size; +} + +template <> +inline int ParquetPlainEncoder:: +Decode<bool, parquet::Type::BOOLEAN>(const uint8_t* buffer, + const uint8_t* buffer_end, int fixed_len_size, bool* v) { + DCHECK(false) << "Use ParquetBoolDecoder for decoding bools"; + return -1; +} + +template <> +inline int ParquetPlainEncoder:: +Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, + const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) { + return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); +} + +template <> +inline int ParquetPlainEncoder:: +Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, + const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) { + return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); +} + +template <> +inline int ParquetPlainEncoder:: +Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, + const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) { + return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); +} + +/// Helper method to decode Decimal type stored as variable length byte array. +template<typename T> +inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* buffer_end, + int fixed_len_size, T* v) { + if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1; + int encoded_byte_size; + memcpy(&encoded_byte_size, buffer, sizeof(int32_t)); + int byte_size = sizeof(int32_t) + encoded_byte_size; + if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) return -1; + uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t); + DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v); + return byte_size; +} + +template <> +inline int ParquetPlainEncoder::Decode<Decimal4Value, parquet::Type::BYTE_ARRAY>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, + Decimal4Value* v) { + return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); +} + +template <> +inline int ParquetPlainEncoder::Decode<Decimal8Value, parquet::Type::BYTE_ARRAY>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, + Decimal8Value* v) { + return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); +} + +template <> +inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY>( + const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, + Decimal16Value* v) { + return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); +} + +/// Helper class that contains the parameters needed for Timestamp decoding. +/// Can be safely passed by value. +class ParquetTimestampDecoder { +public: + ParquetTimestampDecoder() {} + + ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* timezone, + bool convert_int96_timestamps); + + bool NeedsConversion() const { return timezone_ != nullptr; } + + /// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 'buffer_end' + /// and converts it TimestampValue. 'buffer' need not be aligned. + template <parquet::Type::type PARQUET_TYPE> + int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const; + + /// Batched version of Decode() that tries to decode 'num_values' values from the memory + /// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes. + /// Returns the number of bytes read from 'buffer' or -1 if there was an error + /// decoding, e.g. invalid data or running out of input data before reading + /// 'num_values'. + template <parquet::Type::type PARQUET_TYPE> + int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end, + int64_t num_values, int64_t stride, TimestampValue* v); + + TimestampValue Int64ToTimestampValue(int64_t unix_time) const { + DCHECK(precision_ == MILLI || precision_ == MICRO); + return precision_ == MILLI ? TimestampValue::UtcFromUnixTimeMillis(unix_time) : + TimestampValue::UtcFromUnixTimeMicros(unix_time); + } + + void ConvertToLocalTime(TimestampValue* v) const { + DCHECK(timezone_ != nullptr); + if (v->HasDateAndTime()) v->UtcToLocal(*timezone_); + } + + /// Timezone conversion of min/max stats need some extra logic because UTC->local + /// conversion can change ordering near timezone rule changes. The max value is + /// increased and min value is decreased to avoid incorrectly dropping column chunks + /// (or pages once IMPALA-5843 is ready). + + /// If timestamp t >= v before conversion, then this function converts v in such a + /// way that the same will be true after t is converted. + void ConvertMinStatToLocalTime(TimestampValue* v) const; + + /// If timestamp t <= v before conversion, then this function converts v in such a + /// way that the same will be true after t is converted. + void ConvertMaxStatToLocalTime(TimestampValue* v) const; + +private: + enum Precision { MILLI, MICRO, NANO }; + + /// Timezone used for UTC->Local conversions. If nullptr, no conversion is needed. + const Timezone* timezone_ = nullptr; + + /// Unit of the encoded timestamp. Used to decide between milli and microseconds during + /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is also planned + /// to be implemented once it is added in Parquet (PARQUET-1387). + Precision precision_ = NANO; +}; + +template <> +inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>( + const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const { + DCHECK(precision_ == MILLI || precision_ == MICRO); + int64_t unix_time; + int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>( + buffer, buffer_end, 0, &unix_time); + if (UNLIKELY(bytes_read < 0)) { + return bytes_read; + } + *v = Int64ToTimestampValue(unix_time); + // TODO: It would be more efficient to do the timezone conversion in the same step + // as the int64_t -> TimestampValue conversion. This would be also needed to + // move conversion/validation to dictionary construction (IMPALA-4994) and to + // implement dictionary filtering for TimestampValues. + return bytes_read; +} + +template <> +inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>( + const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const { + DCHECK_EQ(precision_, NANO); + return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>( + buffer, buffer_end, 0, v); +} + +template <parquet::Type::type PARQUET_TYPE> +inline int64_t ParquetTimestampDecoder::DecodeBatch(const uint8_t* buffer, + const uint8_t* buffer_end, int64_t num_values, int64_t stride, + TimestampValue* v) { + const uint8_t* buffer_pos = buffer; + StrideWriter<TimestampValue> out(v, stride); + for (int64_t i = 0; i < num_values; ++i) { + int encoded_len = Decode<PARQUET_TYPE>(buffer_pos, buffer_end, out.Advance()); + if (UNLIKELY(encoded_len < 0)) return -1; + buffer_pos += encoded_len; + } + return buffer_pos - buffer; +} +} +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-level-decoder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-level-decoder.cc b/be/src/exec/parquet/parquet-level-decoder.cc new file mode 100644 index 0000000..166230c --- /dev/null +++ b/be/src/exec/parquet/parquet-level-decoder.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet/parquet-level-decoder.h" + +#include "exec/read-write-util.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "util/bit-util.h" + +#include "common/names.h" + +using parquet::Encoding; + +namespace impala { + +const int16_t ParquetLevel::ROW_GROUP_END; +const int16_t ParquetLevel::INVALID_LEVEL; +const int16_t ParquetLevel::INVALID_POS; + +Status ParquetLevelDecoder::Init(const string& filename, Encoding::type encoding, + MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size) { + DCHECK(*data != nullptr); + DCHECK_GE(*data_size, 0); + DCHECK_GT(cache_size, 0); + cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32); + max_level_ = max_level; + filename_ = filename; + RETURN_IF_ERROR(InitCache(cache_pool, cache_size)); + + // Return because there is no level data to read, e.g., required field. + if (max_level == 0) return Status::OK(); + + int32_t num_bytes = 0; + switch (encoding) { + case Encoding::RLE: { + Status status; + if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) { + return status; + } + if (num_bytes < 0 || num_bytes > *data_size) { + return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes); + } + int bit_width = BitUtil::Log2Ceiling64(max_level + 1); + rle_decoder_.Reset(*data, num_bytes, bit_width); + break; + } + case parquet::Encoding::BIT_PACKED: + return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename); + default: { + stringstream ss; + ss << "Unsupported encoding: " << encoding; + return Status(ss.str()); + } + } + if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) { + return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but " + "only $2 bytes left in page", + filename, num_bytes, *data_size)); + } + *data += num_bytes; + *data_size -= num_bytes; + return Status::OK(); +} + +Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) { + num_cached_levels_ = 0; + cached_level_idx_ = 0; + // Memory has already been allocated. + if (cached_levels_ != nullptr) { + DCHECK_EQ(cache_size_, cache_size); + return Status::OK(); + } + + cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size)); + if (cached_levels_ == nullptr) { + return pool->mem_tracker()->MemLimitExceeded( + nullptr, "Definition level cache", cache_size); + } + memset(cached_levels_, 0, cache_size); + cache_size_ = cache_size; + return Status::OK(); +} + +Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) { + /// Fill the cache completely if there are enough values remaining. + /// Otherwise don't try to read more values than are left. + int batch_size = min(vals_remaining, cache_size_); + if (max_level_ > 0) { + if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) + || num_cached_levels_ < batch_size)) { + return Status(decoding_error_code_, vals_remaining, filename_); + } + } else { + // No levels to read, e.g., because the field is required. The cache was + // already initialized with all zeros, so we can hand out those values. + DCHECK_EQ(max_level_, 0); + cached_level_idx_ = 0; + num_cached_levels_ = batch_size; + } + return Status::OK(); +} + +bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) { + DCHECK(!CacheHasNext()); + DCHECK(num_cached_levels != nullptr); + DCHECK_GE(max_level_, 0); + DCHECK_GE(*num_cached_levels, 0); + cached_level_idx_ = 0; + if (max_level_ == 0) { + // No levels to read, e.g., because the field is required. The cache was + // already initialized with all zeros, so we can hand out those values. + *num_cached_levels = batch_size; + return true; + } + *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_); + return *num_cached_levels > 0; +} + +} // namespace impala
