http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-level-decoder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-level-decoder.h b/be/src/exec/parquet/parquet-level-decoder.h new file mode 100644 index 0000000..2e0c24e --- /dev/null +++ b/be/src/exec/parquet/parquet-level-decoder.h @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "common/status.h" +#include "gen-cpp/parquet_types.h" +#include "util/rle-encoding.h" + +namespace impala { + +class MemPool; + +/// Constants used instead of actual levels to indicate special conditions. +class ParquetLevel { + public: + /// The rep and def levels are set to this value to indicate the end of a row group. + static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min(); + /// Indicates an invalid definition or repetition level. + static const int16_t INVALID_LEVEL = -1; + /// Indicates an invalid position value. + static const int16_t INVALID_POS = -1; +}; + +/// Decoder for encoded Parquet levels. Only supports the RLE encoding, not the deprecated +/// BIT_PACKED encoding. Optionally reads, decodes, and caches level values in batches. +/// Level values are unsigned 8-bit integers because we support a maximum nesting +/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up +/// populating the level cache (e.g., with RLE we can memset() repeated values). +class ParquetLevelDecoder { + public: + ParquetLevelDecoder(bool is_def_level_decoder) + : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR : + TErrorCode::PARQUET_REP_LEVEL_ERROR) {} + + /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the + /// encoding requires reading metadata from the page header. 'cache_size' will be + /// rounded up to a multiple of 32 internally. + Status Init(const string& filename, parquet::Encoding::type encoding, + MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size); + + /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient + /// as batched methods. + inline int16_t ReadLevel(); + + /// If the next value is part of a repeated run and is not cached, return the length + /// of the repeated run. A max level of 0 is treated as an arbitrarily long run of + /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0. + inline int32_t NextRepeatedRunLength(); + + /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume + /// 'num_to_consume' items in the run. Not valid to call if there are cached levels + /// that have not been consumed. + inline uint8_t GetRepeatedValue(uint32_t num_to_consume); + + /// Decodes and caches the next batch of levels given that there are 'vals_remaining' + /// values left to decode in the page. Resets members associated with the cache. + /// Returns a non-ok status if there was a problem decoding a level, if a level was + /// encountered with a value greater than max_level_, or if fewer than + /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the + /// input did not have the expected number of values. Only valid to call when + /// the cache has been exhausted, i.e. CacheHasNext() is false. + Status CacheNextBatch(int vals_remaining); + + /// Functions for working with the level cache. + bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } + uint8_t CacheGetNext() { + DCHECK_LT(cached_level_idx_, num_cached_levels_); + return cached_levels_[cached_level_idx_++]; + } + void CacheSkipLevels(int num_levels) { + DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_); + cached_level_idx_ += num_levels; + } + int CacheSize() const { return num_cached_levels_; } + int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } + int CacheCurrIdx() const { return cached_level_idx_; } + + private: + /// Initializes members associated with the level cache. Allocates memory for + /// the cache from pool, if necessary. + Status InitCache(MemPool* pool, int cache_size); + + /// Decodes and writes a batch of levels into the cache. Returns true and sets + /// the number of values written to the cache via *num_cached_levels if no errors + /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the + /// end of input was hit without any other errors. Returns false if there was an + /// error decoding a level or if there was an invalid level value greater than + /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e. + /// CacheHasNext() is false. + bool FillCache(int batch_size, int* num_cached_levels); + + /// RLE decoder, used if max_level_ > 0. + RleBatchDecoder<uint8_t> rle_decoder_; + + /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed + /// in Init(). + uint8_t* cached_levels_ = nullptr; + + /// Number of valid level values in the cache. + int num_cached_levels_ = 0; + + /// Current index into cached_levels_. + int cached_level_idx_ = 0; + + /// For error checking and reporting. + int max_level_ = 0; + + /// Number of level values cached_levels_ has memory allocated for. Always + /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches. + int cache_size_ = 0; + + /// Name of the parquet file. Used for reporting level decoding errors. + string filename_; + + /// Error code to use when reporting level decoding errors. + TErrorCode::type decoding_error_code_; +}; + +inline int16_t ParquetLevelDecoder::ReadLevel() { + if (UNLIKELY(!CacheHasNext())) { + if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) { + return ParquetLevel::INVALID_LEVEL; + } + DCHECK_GE(num_cached_levels_, 0); + if (UNLIKELY(num_cached_levels_ == 0)) { + return ParquetLevel::INVALID_LEVEL; + } + } + return CacheGetNext(); +} + +inline int32_t ParquetLevelDecoder::NextRepeatedRunLength() { + if (CacheHasNext()) return 0; + // Treat always-zero levels as an infinitely long run of zeroes. Return the maximum + // run length allowed by the Parquet standard. + if (max_level_ == 0) return numeric_limits<int32_t>::max(); + return rle_decoder_.NextNumRepeats(); +} + +inline uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) { + DCHECK(!CacheHasNext()); + // Treat always-zero levels as an infinitely long run of zeroes. + if (max_level_ == 0) return 0; + return rle_decoder_.GetRepeatedValue(num_to_consume); +} + +} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc new file mode 100644 index 0000000..7cbfeda --- /dev/null +++ b/be/src/exec/parquet/parquet-metadata-utils.cc @@ -0,0 +1,733 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet/parquet-metadata-utils.h" + +#include <strings.h> +#include <sstream> +#include <string> +#include <vector> + +#include <boost/algorithm/string.hpp> +#include <gutil/strings/substitute.h> + +#include "common/logging.h" +#include "common/status.h" +#include "exec/parquet/parquet-column-stats.h" +#include "exec/parquet/parquet-common.h" +#include "runtime/runtime-state.h" +#include "util/debug-util.h" + +#include "common/names.h" + +using boost::algorithm::is_any_of; +using boost::algorithm::split; +using boost::algorithm::token_compress_on; + +namespace impala { + +namespace { + +const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = { + {PrimitiveType::INVALID_TYPE, {parquet::Type::BOOLEAN}}, + {PrimitiveType::TYPE_NULL, {parquet::Type::BOOLEAN}}, + {PrimitiveType::TYPE_BOOLEAN, {parquet::Type::BOOLEAN}}, + {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}}, + {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}}, + {PrimitiveType::TYPE_INT, {parquet::Type::INT32}}, + {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT32, parquet::Type::INT64}}, + {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}}, + {PrimitiveType::TYPE_DOUBLE, {parquet::Type::INT32, parquet::Type::FLOAT, + parquet::Type::DOUBLE}}, + {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}}, + {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_DATETIME, {parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_BINARY, {parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_DECIMAL, {parquet::Type::INT32, parquet::Type::INT64, + parquet::Type::FIXED_LEN_BYTE_ARRAY, parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_CHAR, {parquet::Type::BYTE_ARRAY}}, + {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}}, +}; + +/// Physical types that are only supported with specific converted types. +const map<PrimitiveType, set<pair<parquet::Type::type, parquet::ConvertedType::type>>> + SUPPORTED_CONVERTED_TYPES = { + {PrimitiveType::TYPE_TIMESTAMP, + {{parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MICROS}, + {parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MILLIS}}}}; +}; + +/// Returns true if 'parquet_type' is a supported physical encoding for the Impala +/// primitive type, false otherwise. Some physical types are accepted only for certain +/// converted types. +bool IsSupportedType(PrimitiveType impala_type, + const parquet::SchemaElement& element) { + auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type); + DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end()); + parquet::Type::type parquet_type = element.type; + if (encodings->second.find(parquet_type) != encodings->second.end()) return true; + + if(!element.__isset.converted_type) return false; + parquet::ConvertedType::type converted_type = element.converted_type; + auto converted_types = SUPPORTED_CONVERTED_TYPES.find(impala_type); + if (converted_types == SUPPORTED_CONVERTED_TYPES.end()) return false; + if (converted_types->second.find({parquet_type, converted_type}) + != converted_types->second.end()) return true; + + return false; +} + +// Needs to be in sync with the order of enum values declared in TParquetArrayResolution. +const std::vector<ParquetSchemaResolver::ArrayEncoding> + ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] = + {{ParquetSchemaResolver::THREE_LEVEL, ParquetSchemaResolver::ONE_LEVEL}, + {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::ONE_LEVEL}, + {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::THREE_LEVEL, + ParquetSchemaResolver::ONE_LEVEL}}; + +Status ParquetMetadataUtils::ValidateFileVersion( + const parquet::FileMetaData& file_metadata, const char* filename) { + if (file_metadata.version > PARQUET_CURRENT_VERSION) { + stringstream ss; + ss << "File: " << filename << " is of an unsupported version. " + << "file version: " << file_metadata.version; + return Status(ss.str()); + } + return Status::OK(); +} + +Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename, + int64_t file_length, const parquet::RowGroup& row_group) { + for (int i = 0; i < row_group.columns.size(); ++i) { + const parquet::ColumnChunk& col_chunk = row_group.columns[i]; + RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, + col_chunk.meta_data.data_page_offset, "data page offset")); + int64_t col_start = col_chunk.meta_data.data_page_offset; + // The file format requires that if a dictionary page exists, it be before data pages. + if (col_chunk.meta_data.__isset.dictionary_page_offset) { + RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, + col_chunk.meta_data.dictionary_page_offset, "dictionary page offset")); + if (col_chunk.meta_data.dictionary_page_offset >= col_start) { + return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary " + "page (offset=$1) must come before any data pages (offset=$2).", + filename, col_chunk.meta_data.dictionary_page_offset, col_start)); + } + col_start = col_chunk.meta_data.dictionary_page_offset; + } + int64_t col_len = col_chunk.meta_data.total_compressed_size; + int64_t col_end = col_start + col_len; + if (col_end <= 0 || col_end > file_length) { + return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has " + "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i, + col_start, col_len, file_length)); + } + } + return Status::OK(); +} + +Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx, + int64_t file_length, int64_t offset, const string& offset_name) { + if (offset < 0 || offset >= file_length) { + return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid " + "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset, + file_length)); + } + return Status::OK();; +} + +static bool IsEncodingSupported(parquet::Encoding::type e) { + switch (e) { + case parquet::Encoding::PLAIN: + case parquet::Encoding::PLAIN_DICTIONARY: + case parquet::Encoding::BIT_PACKED: + case parquet::Encoding::RLE: + return true; + default: + return false; + } +} + +Status ParquetMetadataUtils::ValidateRowGroupColumn( + const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx, + int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) { + const parquet::ColumnMetaData& col_chunk_metadata = + file_metadata.row_groups[row_group_idx].columns[col_idx].meta_data; + + // Check the encodings are supported. + const vector<parquet::Encoding::type>& encodings = col_chunk_metadata.encodings; + for (int i = 0; i < encodings.size(); ++i) { + if (!IsEncodingSupported(encodings[i])) { + return Status(Substitute("File '$0' uses an unsupported encoding: $1 for column " + "'$2'.", filename, PrintThriftEnum(encodings[i]), schema_element.name)); + } + } + + // Check the compression is supported. + if (col_chunk_metadata.codec != parquet::CompressionCodec::UNCOMPRESSED && + col_chunk_metadata.codec != parquet::CompressionCodec::SNAPPY && + col_chunk_metadata.codec != parquet::CompressionCodec::GZIP) { + return Status(Substitute("File '$0' uses an unsupported compression: $1 for column " + "'$2'.", filename, col_chunk_metadata.codec, schema_element.name)); + } + + if (col_chunk_metadata.type != schema_element.type) { + return Status(Substitute("Mismatched column chunk Parquet type in file '$0' column " + "'$1'. Expected $2 actual $3: file may be corrupt", filename, + schema_element.name, col_chunk_metadata.type, schema_element.type)); + } + return Status::OK(); +} + +Status ParquetMetadataUtils::ValidateColumn(const char* filename, + const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc, + RuntimeState* state) { + // Following validation logic is only for non-complex types. + if (slot_desc->type().IsComplexType()) return Status::OK(); + + if (UNLIKELY(!IsSupportedType(slot_desc->type().type, schema_element))) { + return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical " + "type: $1, physical type: $2. File may be corrupt.", + filename, slot_desc->type().type, schema_element.type)); + } + + // Check the decimal scale in the file matches the metastore scale and precision. + // We fail the query if the metadata makes it impossible for us to safely read + // the file. If we don't require the metadata, we will fail the query if + // abort_on_error is true, otherwise we will just log a warning. + bool is_converted_type_decimal = schema_element.__isset.converted_type + && schema_element.converted_type == parquet::ConvertedType::DECIMAL; + if (slot_desc->type().type == TYPE_DECIMAL) { + // TODO: allow converting to wider type (IMPALA-2515) + if (schema_element.type == parquet::Type::INT32 && + sizeof(int32_t) != slot_desc->type().GetByteSize()) { + return Status(Substitute("File '$0' decimal column '$1' is stored as INT32, but " + "based on the precision in the table metadata, another type would needed.", + filename, schema_element.name)); + } + if (schema_element.type == parquet::Type::INT64 && + sizeof(int64_t) != slot_desc->type().GetByteSize()) { + return Status(Substitute("File '$0' decimal column '$1' is stored as INT64, but " + "based on the precision in the table metadata, another type would needed.", + filename, schema_element.name)); + } + // We require that the scale and byte length be set. + if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) { + if (!schema_element.__isset.type_length) { + return Status(Substitute("File '$0' column '$1' does not have type_length set.", + filename, schema_element.name)); + } + + int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type()); + if (schema_element.type_length != expected_len) { + return Status(Substitute("File '$0' column '$1' has an invalid type length. " + "Expecting: $2 len in file: $3", filename, schema_element.name, expected_len, + schema_element.type_length)); + } + } + if (!schema_element.__isset.scale) { + return Status(Substitute("File '$0' column '$1' does not have the scale set.", + filename, schema_element.name)); + } + + if (schema_element.scale != slot_desc->type().scale) { + // TODO: we could allow a mismatch and do a conversion at this step. + return Status(Substitute("File '$0' column '$1' has a scale that does not match " + "the table metadata scale. File metadata scale: $2 Table metadata scale: $3", + filename, schema_element.name, schema_element.scale, slot_desc->type().scale)); + } + + // The other decimal metadata should be there but we don't need it. + if (!schema_element.__isset.precision) { + ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } else { + if (schema_element.precision != slot_desc->type().precision) { + // TODO: we could allow a mismatch and do a conversion at this step. + ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name, + schema_element.precision, slot_desc->type().precision); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + } + + if (!is_converted_type_decimal) { + // TODO: is this validation useful? It is not required at all to read the data and + // might only serve to reject otherwise perfectly readable files. + ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename, + schema_element.name); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + } else if (schema_element.__isset.scale || schema_element.__isset.precision + || is_converted_type_decimal) { + ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, schema_element.name, + slot_desc->type().DebugString()); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + return Status::OK(); +} + +ParquetFileVersion::ParquetFileVersion(const string& created_by) { + string created_by_lower = created_by; + std::transform(created_by_lower.begin(), created_by_lower.end(), + created_by_lower.begin(), ::tolower); + is_impala_internal = false; + + vector<string> tokens; + split(tokens, created_by_lower, is_any_of(" "), token_compress_on); + // Boost always creates at least one token + DCHECK_GT(tokens.size(), 0); + application = tokens[0]; + + if (tokens.size() >= 3 && tokens[1] == "version") { + string version_string = tokens[2]; + // Ignore any trailing nodextra characters + int n = version_string.find_first_not_of("0123456789."); + string version_string_trimmed = version_string.substr(0, n); + + vector<string> version_tokens; + split(version_tokens, version_string_trimmed, is_any_of(".")); + version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0; + version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0; + version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0; + + if (application == "impala") { + if (version_string.find("-internal") != string::npos) is_impala_internal = true; + } + } else { + version.major = 0; + version.minor = 0; + version.patch = 0; + } +} + +bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const { + if (version.major < major) return true; + if (version.major > major) return false; + DCHECK_EQ(version.major, major); + if (version.minor < minor) return true; + if (version.minor > minor) return false; + DCHECK_EQ(version.minor, minor); + return version.patch < patch; +} + +bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const { + return version.major == major && version.minor == minor && version.patch == patch; +} + +static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) { + switch (t) { + case parquet::FieldRepetitionType::REQUIRED: return "required"; + case parquet::FieldRepetitionType::OPTIONAL: return "optional"; + case parquet::FieldRepetitionType::REPEATED: return "repeated"; + default: return "<unknown>"; + } +} + +static string PrintParquetType(const parquet::Type::type& t) { + switch (t) { + case parquet::Type::BOOLEAN: return "boolean"; + case parquet::Type::INT32: return "int32"; + case parquet::Type::INT64: return "int64"; + case parquet::Type::INT96: return "int96"; + case parquet::Type::FLOAT: return "float"; + case parquet::Type::DOUBLE: return "double"; + case parquet::Type::BYTE_ARRAY: return "byte_array"; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array"; + default: return "<unknown>"; + } +} + +string SchemaNode::DebugString(int indent) const { + stringstream ss; + for (int i = 0; i < indent; ++i) ss << " "; + ss << PrintRepetitionType(element->repetition_type) << " "; + if (element->num_children > 0) { + ss << "struct"; + } else { + ss << PrintParquetType(element->type); + } + ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level + << " r:" << max_rep_level << "]"; + if (element->num_children > 0) { + ss << " {" << endl; + for (int i = 0; i < element->num_children; ++i) { + ss << children[i].DebugString(indent + 2) << endl; + } + for (int i = 0; i < indent; ++i) ss << " "; + ss << "}"; + } + return ss.str(); +} + +Status ParquetSchemaResolver::CreateSchemaTree( + const vector<parquet::SchemaElement>& schema, SchemaNode* node) const { + int idx = 0; + int col_idx = 0; + RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node)); + if (node->children.empty()) { + return Status(Substitute("Invalid file: '$0' has no columns.", filename_)); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::CreateSchemaTree( + const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level, + int ira_def_level, int* idx, int* col_idx, SchemaNode* node) + const { + if (*idx >= schema.size()) { + return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from " + "flattened schema in file metadata", filename_)); + } + bool is_root_schema = (*idx == 0); + node->element = &schema[*idx]; + ++(*idx); + + if (node->element->num_children == 0) { + // node is a leaf node, meaning it's materialized in the file and appears in + // file_metadata_.row_groups.columns + node->col_idx = *col_idx; + ++(*col_idx); + } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) { + // Sanity-check the schema to avoid allocating absurdly large buffers below. + return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than " + "limit of $2. File is likely corrupt", filename_, node->element->num_children, + SCHEMA_NODE_CHILDREN_SANITY_LIMIT)); + } else if (node->element->num_children < 0) { + return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.", + filename_, node->element->num_children)); + } + + // def_level_of_immediate_repeated_ancestor does not include this node, so set before + // updating ira_def_level + node->def_level_of_immediate_repeated_ancestor = ira_def_level; + + if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) { + ++max_def_level; + } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED && + !is_root_schema /*PARQUET-843*/) { + ++max_rep_level; + // Repeated fields add a definition level. This is used to distinguish between an + // empty list and a list with an item in it. + ++max_def_level; + // node is the new most immediate repeated ancestor + ira_def_level = max_def_level; + } + node->max_def_level = max_def_level; + node->max_rep_level = max_rep_level; + + node->children.resize(node->element->num_children); + for (int i = 0; i < node->element->num_children; ++i) { + RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level, + idx, col_idx, &node->children[i])); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node, + bool* pos_field, bool* missing_field) const { + *missing_field = false; + const vector<ArrayEncoding>& ordered_array_encodings = + ORDERED_ARRAY_ENCODINGS[array_resolution_]; + + bool any_missing_field = false; + Status statuses[NUM_ARRAY_ENCODINGS]; + for (const auto& array_encoding: ordered_array_encodings) { + bool current_missing_field; + statuses[array_encoding] = ResolvePathHelper( + array_encoding, path, node, pos_field, ¤t_missing_field); + if (current_missing_field) DCHECK(statuses[array_encoding].ok()); + if (statuses[array_encoding].ok() && !current_missing_field) return Status::OK(); + any_missing_field = any_missing_field || current_missing_field; + } + // None of resolutions yielded a node. Set *missing_field to true if any of the + // resolutions reported a missing a field. + if (any_missing_field) { + *node = NULL; + *missing_field = true; + return Status::OK(); + } + + // All resolutions failed. Log and return the most relevant status. The three-level + // encoding is the Parquet standard, so always prefer that. Prefer the two-level over + // the one-level because the two-level can be specifically selected via a query option. + Status error_status = Status::OK(); + for (int i = THREE_LEVEL; i >= ONE_LEVEL; --i) { + if (!statuses[i].ok()) { + error_status = statuses[i]; + break; + } + } + DCHECK(!error_status.ok()); + *node = NULL; + VLOG_QUERY << error_status.msg().msg() << "\n" << GetStackTrace(); + return error_status; +} + +Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding, + const SchemaPath& path, SchemaNode** node, bool* pos_field, + bool* missing_field) const { + DCHECK(schema_.element != NULL) + << "schema_ must be initialized before calling ResolvePath()"; + + *pos_field = false; + *missing_field = false; + *node = const_cast<SchemaNode*>(&schema_); + const ColumnType* col_type = NULL; + + // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by + // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema. + for (int i = 0; i < path.size(); ++i) { + // Advance '*node' if necessary + if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) { + *node = NextSchemaNode(col_type, path, i, *node, missing_field); + if (*missing_field) return Status::OK(); + } else { + // We just resolved an array, meaning *node is set to the repeated field of the + // array. Since we are trying to resolve using one- or two-level array encoding, the + // repeated field represents both the array and the array's item (i.e. there is no + // explict item field), so we don't advance *node in this case. + DCHECK(col_type != NULL); + DCHECK_EQ(col_type->type, TYPE_ARRAY); + DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL); + DCHECK((*node)->is_repeated()); + } + + // Advance 'col_type' + int table_idx = path[i]; + col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type() + : &col_type->children[table_idx]; + + // Resolve path[i] + if (col_type->type == TYPE_ARRAY) { + DCHECK_EQ(col_type->children.size(), 1); + RETURN_IF_ERROR( + ResolveArray(array_encoding, path, i, node, pos_field, missing_field)); + if (*missing_field || *pos_field) return Status::OK(); + } else if (col_type->type == TYPE_MAP) { + DCHECK_EQ(col_type->children.size(), 2); + RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field)); + if (*missing_field) return Status::OK(); + } else if (col_type->type == TYPE_STRUCT) { + DCHECK_GT(col_type->children.size(), 0); + // Nothing to do for structs + } else { + DCHECK(!col_type->IsComplexType()); + DCHECK_EQ(i, path.size() - 1); + RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i)); + } + } + DCHECK(*node != NULL); + return Status::OK(); +} + +SchemaNode* ParquetSchemaResolver::NextSchemaNode( + const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node, + bool* missing_field) const { + DCHECK_LT(next_idx, path.size()); + if (next_idx != 0) DCHECK(col_type != NULL); + + int file_idx; + int table_idx = path[next_idx]; + if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) { + if (next_idx == 0) { + // Resolve top-level table column by name. + DCHECK_LT(table_idx, tbl_desc_.col_descs().size()); + const string& name = tbl_desc_.col_descs()[table_idx].name(); + file_idx = FindChildWithName(node, name); + } else if (col_type->type == TYPE_STRUCT) { + // Resolve struct field by name. + DCHECK_LT(table_idx, col_type->field_names.size()); + const string& name = col_type->field_names[table_idx]; + file_idx = FindChildWithName(node, name); + } else if (col_type->type == TYPE_ARRAY) { + // Arrays have only one child in the file. + DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM); + file_idx = table_idx; + } else { + DCHECK_EQ(col_type->type, TYPE_MAP); + // Maps have two values, "key" and "value". These are supposed to be ordered and may + // not have the right field names, but try to resolve by name in case they're + // switched and otherwise use the order. See + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for + // more details. + DCHECK(table_idx == SchemaPathConstants::MAP_KEY || + table_idx == SchemaPathConstants::MAP_VALUE); + const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value"; + file_idx = FindChildWithName(node, name); + if (file_idx >= node->children.size()) { + // Couldn't resolve by name, fall back to resolution by position. + file_idx = table_idx; + } + } + } else { + // Resolution by position. + DCHECK_EQ(fallback_schema_resolution_, + TParquetFallbackSchemaResolution::type::POSITION); + if (next_idx == 0) { + // For top-level columns, the first index in a path includes the table's partition + // keys. + file_idx = table_idx - tbl_desc_.num_clustering_cols(); + } else { + file_idx = table_idx; + } + } + + if (file_idx >= node->children.size()) { + string schema_resolution_mode = "unknown"; + auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find( + fallback_schema_resolution_); + if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) { + schema_resolution_mode = entry->second; + } + VLOG_FILE << Substitute( + "File '$0' does not contain path '$1' (resolving by $2)", filename_, + PrintPath(tbl_desc_, path), schema_resolution_mode); + *missing_field = true; + return NULL; + } + return &node->children[file_idx]; +} + +int ParquetSchemaResolver::FindChildWithName(SchemaNode* node, + const string& name) const { + int idx; + for (idx = 0; idx < node->children.size(); ++idx) { + if (strcasecmp(node->children[idx].element->name.c_str(), name.c_str()) == 0) break; + } + return idx; +} + +// There are three types of array encodings: +// +// 1. One-level encoding +// A bare repeated field. This is interpreted as a required array of required +// items. +// Example: +// repeated <item-type> item; +// +// 2. Two-level encoding +// A group containing a single repeated field. This is interpreted as a +// <list-repetition> array of required items (<list-repetition> is either +// optional or required). +// Example: +// <list-repetition> group <name> { +// repeated <item-type> item; +// } +// +// 3. Three-level encoding +// The "official" encoding according to the parquet spec. A group containing a +// single repeated group containing the item field. This is interpreted as a +// <list-repetition> array of <item-repetition> items (<list-repetition> and +// <item-repetition> are each either optional or required). +// Example: +// <list-repetition> group <name> { +// repeated group list { +// <item-repetition> <item-type> item; +// } +// } +// +// We ignore any field annotations or names, making us more permissive than the +// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a +// group containing more fields, which corresponds to a complex item type. See +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for +// more details and examples. +// +// This function resolves the array at '*node' assuming one-, two-, or three-level +// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all +// three encodings (unless '*pos_field' or '*missing_field' are set to true). +Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding, + const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field, + bool* missing_field) const { + if (array_encoding == ONE_LEVEL) { + if (!(*node)->is_repeated()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); + return Status::Expected(msg); + } + } else { + // In the multi-level case, we always expect the outer group to contain a single + // repeated field + if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); + return Status::Expected(msg); + } + // Set *node to the repeated field + *node = &(*node)->children[0]; + } + DCHECK((*node)->is_repeated()); + + if (idx + 1 < path.size()) { + if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) { + // The next index in 'path' is the artifical position field. + DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!"; + *pos_field = true; + *node = NULL; + return Status::OK(); + } else { + // The next value in 'path' should be the item index + DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM); + } + } + return Status::OK(); +} + +// According to the parquet spec, map columns are represented like: +// <map-repetition> group <name> (MAP) { +// repeated group key_value { +// required <key-type> key; +// <value-repetition> <value-type> value; +// } +// } +// We ignore any field annotations or names, making us more permissive than the +// Parquet spec dictates. See +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for +// more details. +Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx, + SchemaNode** node, bool* missing_field) const { + if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() || + (*node)->children[0].children.size() != 2) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString()); + return Status::Expected(msg); + } + *node = &(*node)->children[0]; + + // The next index in 'path' should be the key or the value. + if (idx + 1 < path.size()) { + DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY || + path[idx + 1] == SchemaPathConstants::MAP_VALUE); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node, + const ColumnType& col_type, const SchemaPath& path, int idx) const { + if (!node.children.empty()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); + return Status::Expected(msg); + } + if (!IsSupportedType(col_type.type, *node.element)) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); + return Status::Expected(msg); + } + return Status::OK(); +} + +} http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h new file mode 100644 index 0000000..f3a144d --- /dev/null +++ b/be/src/exec/parquet/parquet-metadata-utils.h @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H +#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H + +#include <string> + +#include "runtime/descriptors.h" +#include "gen-cpp/parquet_types.h" + +namespace impala { + +class RuntimeState; + +class ParquetMetadataUtils { + public: + /// Checks the version of the given file and returns a non-OK status if + /// Impala does not support that version. + static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata, + const char* filename); + + /// Validate column offsets by checking if the dictionary page comes before the data + /// pages and checking if the column offsets lie within the file. + static Status ValidateColumnOffsets(const string& filename, int64_t file_length, + const parquet::RowGroup& row_group); + + /// Check that a file offset is in the file. Return an error status with a detailed + /// error message if it is not. + static Status ValidateOffsetInFile(const std::string& filename, int col_idx, + int64_t file_length, int64_t offset, const std::string& offset_name); + + /// Validates the column metadata inside a row group to make sure this column is + /// supported (e.g. encoding, type, etc). + static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata, + const char* filename, int row_group_idx, int col_idx, + const parquet::SchemaElement& schema_element, RuntimeState* state); + + /// Validates the column metadata to make sure the column is supported and its type + /// attributes conform to the parquet spec. + static Status ValidateColumn(const char* filename, + const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc, + RuntimeState* state); +}; + +struct ParquetFileVersion { + /// Application that wrote the file. e.g. "IMPALA" + std::string application; + + /// Version of the application that wrote the file, expressed in three parts + /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are + /// ignored. e.g.: + /// "1.2.3" => {1, 2, 3} + /// "1.2" => {1, 2, 0} + /// "1.2-cdh5" => {1, 2, 0} + struct { + int major; + int minor; + int patch; + } version; + + /// If true, this file was generated by an Impala internal release + bool is_impala_internal; + + ParquetFileVersion() : is_impala_internal(false) { } + + /// Parses the version from the created_by string + ParquetFileVersion(const std::string& created_by); + + /// Returns true if version is strictly less than <major>.<minor>.<patch> + bool VersionLt(int major, int minor = 0, int patch = 0) const; + + /// Returns true if version is equal to <major>.<minor>.<patch> + bool VersionEq(int major, int minor, int patch) const; +}; + +/// Internal representation of a Parquet schema (including nested-type columns). +struct SchemaNode { + /// The corresponding schema element defined in the file metadata + const parquet::SchemaElement* element; + + /// The index into the RowGroup::columns list if this column is materialized in the + /// file (i.e. it's a scalar type). -1 for nested types. + int col_idx; + + /// The maximum definition level of this column, i.e., the definition level that + /// corresponds to a non-NULL value. Valid values are >= 0. + int max_def_level; + + /// The maximum repetition level of this column. Valid values are >= 0. + int max_rep_level; + + /// The definition level of the most immediate ancestor of this node with repeated + /// field repetition type. 0 if there are no repeated ancestors. + int def_level_of_immediate_repeated_ancestor; + + /// Any nested schema nodes. Empty for non-nested types. + std::vector<SchemaNode> children; + + SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1), + def_level_of_immediate_repeated_ancestor(-1) { } + + std::string DebugString(int indent = 0) const; + + bool is_repeated() const { + return element->repetition_type == parquet::FieldRepetitionType::REPEATED; + } +}; + +/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a +/// Parquet file schema. Supports resolution by field index or by field name. +/// Supports different policies for resolving nested arrays based on the modern +/// three-level encoding or the legacy encodings (one and two level). +class ParquetSchemaResolver { + public: + ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc, + TParquetFallbackSchemaResolution::type fallback_schema_resolution, + TParquetArrayResolution::type array_resolution) + : tbl_desc_(tbl_desc), + fallback_schema_resolution_(fallback_schema_resolution), + array_resolution_(array_resolution), + filename_(NULL) { + } + + /// Parses the schema of the given file metadata into an internal schema + /// representation used in path resolution. Remembers the filename for error + /// reporting. Returns a non-OK status if the Parquet schema could not be parsed. + Status Init(const parquet::FileMetaData* file_metadata, const char* filename) { + DCHECK(filename != NULL); + filename_ = filename; + return CreateSchemaTree(file_metadata->schema, &schema_); + } + + /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path' + /// does not exist in this file's schema, 'missing_field' is set to true and + /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path' + /// resolves to a collection position field, *pos_field is set to true. Otherwise + /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved + /// against the file's schema (e.g., unrecognized collection schema). + /// + /// Tries to resolve fields within lists according to the 'ordered_array_encodings_'. + /// Returns a bad status if resolution fails for all attempted array encodings. + Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field, + bool* missing_field) const; + + private: + /// The 'array_encoding' parameter determines whether to assume one-, two-, or + /// three-level array encoding. The returned status is not logged (i.e. it's an expected + /// error). + enum ArrayEncoding { + ONE_LEVEL, + TWO_LEVEL, + THREE_LEVEL, + NUM_ARRAY_ENCODINGS + }; + + /// An arbitrary limit on the number of children per schema node we support. + /// Used to sanity-check Parquet schemas. + static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024; + + /// Maps from the array-resolution policy to the ordered array encodings that should + /// be tried during path resolution. All entries have the ONE_LEVEL encoding at the end + /// because there is no ambiguity between the one-level and the other encodings (there + /// is no harm in trying it). + static const std::vector<ArrayEncoding> ORDERED_ARRAY_ENCODINGS[]; + + /// Unflattens the schema metadata from a Parquet file metadata and converts it to our + /// SchemaNode representation. Returns the result in 'node' unless an error status is + /// returned. Does not set the slot_desc field of any SchemaNode. + Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, + SchemaNode* node) const; + + /// Recursive implementation used internally by the above CreateSchemaTree() function. + Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, + int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx, + SchemaNode* node) const; + + Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path, + SchemaNode** node, bool* pos_field, bool* missing_field) const; + + /// Helper functions for ResolvePathHelper(). + + /// Advances 'node' to one of its children based on path[next_idx] and + /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type + /// associated with 'node'. Returns the child node or sets 'missing_field' to true. + SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path, + int next_idx, SchemaNode* node, bool* missing_field) const; + + /// Returns the index of 'node's child with 'name', or the number of children if not + /// found. The name comparison is case-insensitive because that's how Impala treats + /// db/table/column/field names. If there are several matches with different casing, + /// then the index of the first match is returned. + int FindChildWithName(SchemaNode* node, const string& name) const; + + /// The ResolvePathHelper() logic for arrays. + Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx, + SchemaNode** node, bool* pos_field, bool* missing_field) const; + + /// The ResolvePathHelper() logic for maps. + Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node, + bool* missing_field) const; + + /// The ResolvePathHelper() logic for scalars (just does validation since there's no + /// more actual work to be done). + Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type, + const SchemaPath& path, int idx) const; + + const HdfsTableDescriptor& tbl_desc_; + const TParquetFallbackSchemaResolution::type fallback_schema_resolution_; + const TParquetArrayResolution::type array_resolution_; + const char* filename_; + + /// Root node of our internal schema representation populated in Init(). + SchemaNode schema_; +}; + +} // impala namespace + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-plain-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-plain-test.cc b/be/src/exec/parquet/parquet-plain-test.cc new file mode 100644 index 0000000..6eb880f --- /dev/null +++ b/be/src/exec/parquet/parquet-plain-test.cc @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <limits.h> +#include <stdio.h> +#include <stdlib.h> +#include <iostream> +#include "exec/parquet/parquet-common.h" +#include "runtime/decimal-value.h" +#include "runtime/string-value.inline.h" +#include "runtime/timestamp-value.h" +#include "testutil/gtest-util.h" + +#include "common/names.h" + +namespace impala { + +template <typename InternalType> +int Encode(const InternalType& v, int encoded_byte_size, uint8_t* buffer, + parquet::Type::type physical_type){ + return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer); +} + +// Handle special case of encoding decimal types stored as BYTE_ARRAY, INT32, and INT64, +// since these are not implemented in Impala. +// When parquet_type equals BYTE_ARRAY: 'encoded_byte_size' is the sum of the +// minimum number of bytes required to store the unscaled value and the bytes required to +// store the size. Value 'v' passed to it should not contain leading zeros as this +// method does not strictly conform to the parquet spec in removing those. +// When parquet_type is INT32 or INT64, we simply write the unscaled value to the buffer. +template <typename DecimalType> +int EncodeDecimal(const DecimalType& v, int encoded_byte_size, uint8_t* buffer, + parquet::Type::type parquet_type) { + if (parquet_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) { + return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer); + } else if (parquet_type == parquet::Type::BYTE_ARRAY) { + int decimal_size = encoded_byte_size - sizeof(int32_t); + memcpy(buffer, &decimal_size, sizeof(int32_t)); + DecimalUtil::EncodeToFixedLenByteArray(buffer + sizeof(int32_t), decimal_size, v); + return encoded_byte_size; + } else if (parquet_type == parquet::Type::INT32 || + parquet_type == parquet::Type::INT64) { + return ParquetPlainEncoder::Encode(v.value(), encoded_byte_size, buffer); + } + return -1; +} + +template<> +int Encode(const Decimal4Value& v, int encoded_byte_size, uint8_t* buffer, + parquet::Type::type parquet_type) { + return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type); +} + +template<> +int Encode(const Decimal8Value& v, int encoded_byte_size, uint8_t* buffer, + parquet::Type::type parquet_type) { + return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type); +} + +template<> +int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer, + parquet::Type::type parquet_type){ + return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type); +} + +/// Test that the decoder fails when asked to decode a truncated value. +template <typename InternalType, parquet::Type::type PARQUET_TYPE> +void TestTruncate(const InternalType& v, int expected_byte_size) { + uint8_t buffer[expected_byte_size]; + int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE); + EXPECT_EQ(encoded_size, expected_byte_size); + + // Check all possible truncations of the buffer. + for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) { + InternalType result; + /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns. + uint8_t* truncated_buffer = new uint8_t[truncated_size]; + memcpy(truncated_buffer, buffer, truncated_size); + int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>( + truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result); + EXPECT_EQ(-1, decoded_size); + delete[] truncated_buffer; + } +} + +template <typename InternalType, typename WidenInternalType, + parquet::Type::type PARQUET_TYPE> +void TestTruncate(const InternalType& v, int expected_byte_size) { + uint8_t buffer[expected_byte_size]; + int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE); + EXPECT_EQ(encoded_size, expected_byte_size); + + // Check all possible truncations of the buffer. + for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) { + WidenInternalType result; + /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns. + uint8_t* truncated_buffer = new uint8_t[truncated_size]; + memcpy(truncated_buffer, buffer, truncated_size); + int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>( + truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, + &result); + EXPECT_EQ(-1, decoded_size); + delete[] truncated_buffer; + } +} + +template <typename InternalType, parquet::Type::type PARQUET_TYPE> +void TestType(const InternalType& v, int expected_byte_size) { + uint8_t buffer[expected_byte_size]; + int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE); + EXPECT_EQ(encoded_size, expected_byte_size); + + InternalType result; + int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer, + buffer + expected_byte_size, expected_byte_size, &result); + EXPECT_EQ(decoded_size, expected_byte_size); + EXPECT_EQ(result, v); + + TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size); +} + +template <typename InternalType, typename WidenInternalType, + parquet::Type::type PARQUET_TYPE> +void TestTypeWidening(const InternalType& v, int expected_byte_size) { + uint8_t buffer[expected_byte_size]; + int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE); + EXPECT_EQ(encoded_size, expected_byte_size); + + WidenInternalType result; + int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>( + buffer, buffer + expected_byte_size, expected_byte_size, &result); + EXPECT_EQ(decoded_size, expected_byte_size); + EXPECT_EQ(v, result); + + TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>( + v, expected_byte_size); +} + +TEST(PlainEncoding, Basic) { + int8_t i8 = 12; + int16_t i16 = 123; + int32_t i32 = 1234; + int64_t i64 = 12345; + float f = 1.23; + double d = 1.23456; + StringValue sv("Hello"); + TimestampValue tv; + + TestType<int8_t, parquet::Type::INT32>(i8, sizeof(int32_t)); + TestType<int16_t, parquet::Type::INT32>(i16, sizeof(int32_t)); + TestType<int32_t, parquet::Type::INT32>(i32, sizeof(int32_t)); + TestType<int64_t, parquet::Type::INT64>(i64, sizeof(int64_t)); + TestType<float, parquet::Type::FLOAT>(f, sizeof(float)); + TestType<double, parquet::Type::DOUBLE>(d, sizeof(double)); + TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + sv.len); + TestType<TimestampValue, parquet::Type::INT96>(tv, 12); + + // Test type widening. + TestTypeWidening<int32_t, int64_t, parquet::Type::INT32>(i32, sizeof(int32_t)); + TestTypeWidening<int32_t, double, parquet::Type::INT32>(i32, sizeof(int32_t)); + TestTypeWidening<float, double, parquet::Type::FLOAT>(f, sizeof(float)); + + int test_val = 1234; + int var_len_decimal_size = sizeof(int32_t) + + 2 /*min bytes required for storing test_val*/; + // Decimal4Value: General test case + TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val), + var_len_decimal_size); + TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val * -1), + var_len_decimal_size); + TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(test_val), + sizeof(Decimal4Value)); + TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal4Value(test_val * -1), sizeof(Decimal4Value)); + TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val), + sizeof(int32_t)); + TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val * -1), + sizeof(int32_t)); + + // Decimal8Value: General test case + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val), + var_len_decimal_size); + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val * -1), + var_len_decimal_size); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(test_val), + sizeof(Decimal8Value)); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal8Value(test_val * -1), sizeof(Decimal8Value)); + TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val), + sizeof(int64_t)); + TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val * -1), + sizeof(int64_t)); + + // Decimal16Value: General test case + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val), + var_len_decimal_size); + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val * -1), + var_len_decimal_size); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( Decimal16Value(test_val), + sizeof(Decimal16Value)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal16Value(test_val * -1), sizeof(Decimal16Value)); + + // Decimal8Value: int32 limits test + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>( + Decimal8Value(std::numeric_limits<int32_t>::max()), + sizeof(int32_t) + sizeof(int32_t)); + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>( + Decimal8Value(std::numeric_limits<int32_t>::min()), + sizeof(int32_t) + sizeof(int32_t)); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value)); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value)); + TestType<Decimal8Value, parquet::Type::INT64>( + Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(int64_t)); + TestType<Decimal8Value, parquet::Type::INT64>( + Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(int64_t)); + + // Decimal16Value: int32 limits test + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int32_t>::max()), + sizeof(int32_t) + sizeof(int32_t)); + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int32_t>::min()), + sizeof(int32_t) + sizeof(int32_t)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value)); + + // Decimal16Value: int64 limits test + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int64_t>::max()), + sizeof(int32_t) + sizeof(int64_t)); + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int64_t>::min()), + sizeof(int32_t) + sizeof(int64_t)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( + Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value)); + + // two digit values can be encoded with any byte size. + for (int i = 1; i <=16; ++i) { + if (i <= 4) { + TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(i), + i + sizeof(int32_t)); + TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(-i), + i + sizeof(int32_t)); + TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(i), i); + TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(-i), i); + TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(i), sizeof(int32_t)); + TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(-i), sizeof(int32_t)); + } + if (i <= 8) { + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(i), + i + sizeof(int32_t)); + TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(-i), + i + sizeof(int32_t)); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(i), i); + TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(-i), i); + TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(i), sizeof(int64_t)); + TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(-i), sizeof(int64_t)); + } + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(i), + i + sizeof(int32_t)); + TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(-i), + i + sizeof(int32_t)); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(i), i); + TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(-i), i); + } +} + +TEST(PlainEncoding, DecimalBigEndian) { + // Test Basic can pass if we make the same error in encode and decode. + // Verify the bytes are actually big endian. + uint8_t buffer[] = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 + }; + + // Manually generate this to avoid potential bugs in BitUtil + uint8_t buffer_swapped[] = { + 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 + }; + uint8_t result_buffer[16]; + + Decimal4Value d4; + Decimal8Value d8; + Decimal16Value d16; + + memcpy(&d4, buffer, sizeof(d4)); + memcpy(&d8, buffer, sizeof(d8)); + memcpy(&d16, buffer, sizeof(d16)); + + int size = ParquetPlainEncoder::Encode(d4, sizeof(d4), result_buffer); + ASSERT_EQ(size, sizeof(d4)); + ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d4), sizeof(d4)), 0); + + size = ParquetPlainEncoder::Encode(d8, sizeof(d8), result_buffer); + ASSERT_EQ(size, sizeof(d8)); + ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d8), sizeof(d8)), 0); + + size = ParquetPlainEncoder::Encode(d16, sizeof(d16), result_buffer); + ASSERT_EQ(size, sizeof(d16)); + ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d16), sizeof(d16)), 0); +} + +/// Test that corrupt strings are handled correctly. +TEST(PlainEncoding, CorruptString) { + // Test string with negative length. + uint8_t buffer[sizeof(int32_t) + 10]; + int32_t len = -10; + memcpy(buffer, &len, sizeof(int32_t)); + + StringValue result; + int decoded_size = ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>( + buffer, buffer + sizeof(buffer), 0, &result); + EXPECT_EQ(decoded_size, -1); +} + +} + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-scratch-tuple-batch.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-scratch-tuple-batch.h b/be/src/exec/parquet/parquet-scratch-tuple-batch.h new file mode 100644 index 0000000..1b79be1 --- /dev/null +++ b/be/src/exec/parquet/parquet-scratch-tuple-batch.h @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H +#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H + +#include "runtime/descriptors.h" +#include "runtime/row-batch.h" +#include "runtime/tuple-row.h" + +namespace impala { + +/// Helper struct that holds a batch of tuples allocated from a mem pool, as well +/// as state associated with iterating over its tuples and transferring +/// them to an output batch in TransferScratchTuples(). +struct ScratchTupleBatch { + // Memory for the fixed-length parts of the batch of tuples. Allocated from + // 'tuple_mem_pool'. Set to NULL when transferred to an output batch. + uint8_t* tuple_mem = nullptr; + // Number of tuples that can be stored in 'tuple_mem'. + int capacity; + // Keeps track of the current tuple index. + int tuple_idx = 0; + // Number of valid tuples in tuple_mem. + int num_tuples = 0; + // Number of tuples transferred to output batches (i.e. not filtered by predicates). + // num_tuples_transferred > 0 before a call to FinalizeTupleTransfer() implies that + // tuples from the current scratch batch were transferred to a previous output batch. + int num_tuples_transferred = 0; + // Bytes of fixed-length data per tuple. + const int tuple_byte_size; + + // Pool used to allocate 'tuple_mem' and nothing else. + MemPool tuple_mem_pool; + + // Pool used to accumulate other memory that may be referenced by var-len slots in this + // batch, e.g. decompression buffers, allocations for var-len strings and allocations + // for nested arrays. This memory may be referenced by previous batches or the current + // batch, but not by future batches. E.g. a decompression buffer can be safely attached + // only once all values referencing that buffer have been materialized into the batch. + MemPool aux_mem_pool; + + // Tuples transferred to an output row batch are compacted if + // (# tuples materialized / # tuples returned) exceeds this number. Chosen so that the + // cost of copying the tuples should be very small in relation to the original cost of + // materialising them. + const int MIN_SELECTIVITY_TO_COMPACT = 16; + + ScratchTupleBatch( + const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker) + : capacity(batch_size), + tuple_byte_size(row_desc.GetRowSize()), + tuple_mem_pool(mem_tracker), + aux_mem_pool(mem_tracker) { + DCHECK_EQ(row_desc.tuple_descriptors().size(), 1); + } + + Status Reset(RuntimeState* state) { + tuple_idx = 0; + num_tuples = 0; + num_tuples_transferred = 0; + if (tuple_mem == nullptr) { + int64_t dummy; + RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer( + state, &tuple_mem_pool, tuple_byte_size, &capacity, &dummy, &tuple_mem)); + } + return Status::OK(); + } + + /// Release all memory in the MemPools. If 'dst_pool' is non-NULL, transfers it to + /// 'dst_pool'. Otherwise frees the memory. + void ReleaseResources(MemPool* dst_pool) { + if (dst_pool == nullptr) { + tuple_mem_pool.FreeAll(); + aux_mem_pool.FreeAll(); + } else { + dst_pool->AcquireData(&tuple_mem_pool, false); + dst_pool->AcquireData(&aux_mem_pool, false); + } + tuple_mem = nullptr; + } + + /// Finalize transfer of 'num_to_commit' tuples to 'dst_batch' and transfer memory to + /// 'dst_batch' if at the end of 'scratch_batch'. The tuples must not yet be + /// committed to 'dst_batch'. Only needs to be called when materialising non-empty + /// tuples. + void FinalizeTupleTransfer(RowBatch* dst_batch, int num_to_commit) { + DCHECK_GE(num_to_commit, 0); + DCHECK_LE(dst_batch->num_rows() + num_to_commit, dst_batch->capacity()); + DCHECK_LE(num_tuples_transferred + num_to_commit, num_tuples); + DCHECK(tuple_mem != nullptr); + num_tuples_transferred += num_to_commit; + if (!AtEnd()) return; + // We're at the end of the scratch batch. Transfer memory that may be referenced by + // transferred tuples or that we can't reuse to 'dst_batch'. + + // Future tuples won't reference data in 'aux_mem_pool' - always transfer so that + // we don't accumulate unneeded memory in the scratch batch. + dst_batch->tuple_data_pool()->AcquireData(&aux_mem_pool, false); + + // Try to avoid the transfer of 'tuple_mem' for selective scans by compacting the + // output batch. This avoids excessive allocation and transfer of memory, which + // can lead to performance problems like IMPALA-4923. + // Compaction is unsafe if the scratch batch was split across multiple output batches + // because the batch we returned earlier may hold a reference into 'tuple_mem'. + if (num_tuples_transferred > num_to_commit + || num_tuples_transferred * MIN_SELECTIVITY_TO_COMPACT > num_tuples + || !TryCompact(dst_batch, num_to_commit)) { + // Didn't compact - rows in 'dst_batch' reference 'tuple_mem'. + dst_batch->tuple_data_pool()->AcquireData(&tuple_mem_pool, false); + tuple_mem = nullptr; + } + } + + /// Try to compact 'num_uncommitted_tuples' uncommitted tuples that were added to + /// the end of 'dst_batch' by copying them to memory allocated from + /// dst_batch->tuple_data_pool(). Returns true on success or false if the memory + /// could not be allocated. + bool TryCompact(RowBatch* dst_batch, int num_uncommitted_tuples) { + DCHECK_LE(dst_batch->num_rows() + num_uncommitted_tuples, dst_batch->capacity()); + // Copy rows that reference 'tuple_mem' into a new small buffer. This code handles + // the case where num_uncommitted_tuples == 0, since TryAllocate() returns a non-null + // pointer. + int64_t dst_bytes = num_uncommitted_tuples * static_cast<int64_t>(tuple_byte_size); + uint8_t* dst_buffer = dst_batch->tuple_data_pool()->TryAllocate(dst_bytes); + if (dst_buffer == nullptr) return false; + const int end_row = dst_batch->num_rows() + num_uncommitted_tuples; + for (int i = dst_batch->num_rows(); i < end_row; ++i) { + TupleRow* row = dst_batch->GetRow(i); + Tuple* uncompacted_tuple = row->GetTuple(0); + DCHECK_GE(reinterpret_cast<uint8_t*>(uncompacted_tuple), tuple_mem); + DCHECK_LT(reinterpret_cast<uint8_t*>(uncompacted_tuple), + tuple_mem + tuple_byte_size * capacity); + row->SetTuple(0, reinterpret_cast<Tuple*>(dst_buffer)); + memcpy(dst_buffer, uncompacted_tuple, tuple_byte_size); + dst_buffer += tuple_byte_size; + } + return true; + } + + Tuple* GetTuple(int tuple_idx) const { + return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size); + } + + uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; } + uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; } + bool AtEnd() const { return tuple_idx == num_tuples; } + int64_t total_allocated_bytes() const { + return tuple_mem_pool.total_allocated_bytes() + aux_mem_pool.total_allocated_bytes(); + } +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-version-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-version-test.cc b/be/src/exec/parquet/parquet-version-test.cc new file mode 100644 index 0000000..5eaa692 --- /dev/null +++ b/be/src/exec/parquet/parquet-version-test.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <limits.h> +#include <stdio.h> +#include <stdlib.h> +#include <iostream> + +#include "exec/parquet/parquet-metadata-utils.h" +#include "testutil/gtest-util.h" + +#include "common/names.h" + +namespace impala { + +void CheckVersionParse(const string& s, const string& expected_application, + int expected_major, int expected_minor, int expected_patch, + bool expected_is_internal) { + ParquetFileVersion v(s); + EXPECT_EQ(v.application, expected_application) << "String: " << s; + EXPECT_EQ(v.version.major, expected_major) << "String: " << s; + EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s; + EXPECT_EQ(v.version.patch, expected_patch) << "String: " << s; + EXPECT_EQ(v.is_impala_internal, expected_is_internal); +} + +TEST(ParquetVersionTest, Parsing) { + CheckVersionParse("impala version 1.0", "impala", 1, 0, 0, false); + CheckVersionParse("impala VERSION 1.0", "impala", 1, 0, 0, false); + CheckVersionParse("impala VERSION 1.0 ignored", "impala", 1, 0, 0, false); + CheckVersionParse("parquet-mr version 2.0", "parquet-mr", 2, 0, 0, false); + + CheckVersionParse("impala version 1.2", "impala", 1, 2, 0, false); + CheckVersionParse("impala version 1.2.3", "impala", 1, 2, 3, false); + CheckVersionParse("impala version 1.2.3-cdh4.5", "impala", 1, 2, 3, false); + CheckVersionParse("impala version 1.2.3.cdh4.5", "impala", 1, 2, 3, false); + CheckVersionParse("impala version 1.2-cdh4.5", "impala", 1, 2, 0, false); + CheckVersionParse("impala version 1.2.cdh4.5", "impala", 1, 2, 0, false); + CheckVersionParse("impala version 1.2 (build xyz)", "impala", 1, 2, 0, false); + CheckVersionParse("impala version cdh4.5", "impala", 0, 0, 0, false); + + // Test internal versions + CheckVersionParse("impala version 1.0-internal", "impala", 1, 0, 0, true); + CheckVersionParse("impala version 1.23-internal", "impala", 1, 23, 0, true); + CheckVersionParse("impala version 2-inTERnal", "impala", 2, 0, 0, true); + CheckVersionParse("mr version 1-internal", "mr", 1, 0, 0, false); + + // Test some malformed strings. + CheckVersionParse("parquet-mr 2.0", "parquet-mr", 0, 0, 0, false); + CheckVersionParse("impala ve 2.0", "impala", 0, 0, 0, false); + CheckVersionParse("", "", 0, 0, 0, false); +} + +TEST(ParquetVersionTest, Comparisons) { + ParquetFileVersion v("foo version 1.2.3"); + EXPECT_TRUE(v.VersionEq(1, 2, 3)); + EXPECT_FALSE(v.VersionEq(1, 2, 4)); + EXPECT_TRUE(v.VersionLt(3, 2, 1)); + EXPECT_TRUE(v.VersionLt(1, 2, 4)); + EXPECT_TRUE(v.VersionLt(2, 0, 0)); + EXPECT_FALSE(v.VersionLt(0, 0, 0)); + EXPECT_FALSE(v.VersionLt(1, 2, 3)); + EXPECT_FALSE(v.VersionLt(1, 2, 2)); + EXPECT_FALSE(v.VersionLt(0, 4, 4)); +} + +} + +IMPALA_TEST_MAIN(); + http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index dd70b36..74347cd 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -31,6 +31,7 @@ add_library(Util backend-gflag-util.cc benchmark.cc bitmap.cc + bit-packing.cc bit-util.cc bloom-filter.cc bloom-filter-ir.cc http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing-test.cc b/be/src/util/bit-packing-test.cc index bedf178..f1ec10a 100644 --- a/be/src/util/bit-packing-test.cc +++ b/be/src/util/bit-packing-test.cc @@ -21,7 +21,7 @@ #include "testutil/gtest-util.h" #include "testutil/mem-util.h" -#include "util/bit-packing.inline.h" +#include "util/bit-packing.h" #include "util/bit-stream-utils.inline.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing.cc b/be/src/util/bit-packing.cc new file mode 100644 index 0000000..cb3233d --- /dev/null +++ b/be/src/util/bit-packing.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/bit-packing.inline.h" + +#include "runtime/decimal-value.h" +#include "runtime/string-value.h" +#include "runtime/timestamp-value.h" + +namespace impala { + +// Instantiate all of the templated functions needed by the rest of Impala. +#define INSTANTIATE_UNPACK_VALUES(OUT_TYPE) \ + template std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues<OUT_TYPE>( \ + int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, \ + int64_t num_values, OUT_TYPE* __restrict__ out); + +INSTANTIATE_UNPACK_VALUES(bool); +INSTANTIATE_UNPACK_VALUES(uint8_t); +INSTANTIATE_UNPACK_VALUES(uint32_t); + +#define INSTANTIATE_UNPACK_AND_DECODE(OUT_TYPE) \ + template std::pair<const uint8_t*, int64_t> \ + BitPacking::UnpackAndDecodeValues<OUT_TYPE>(int bit_width, \ + const uint8_t* __restrict__ in, int64_t in_bytes, OUT_TYPE* __restrict__ dict, \ + int64_t dict_len, int64_t num_values, OUT_TYPE* __restrict__ out, int64_t stride, \ + bool* __restrict__ decode_error); + +INSTANTIATE_UNPACK_AND_DECODE(bool); +INSTANTIATE_UNPACK_AND_DECODE(double); +INSTANTIATE_UNPACK_AND_DECODE(float); +INSTANTIATE_UNPACK_AND_DECODE(int8_t); +INSTANTIATE_UNPACK_AND_DECODE(int16_t); +INSTANTIATE_UNPACK_AND_DECODE(int32_t); +INSTANTIATE_UNPACK_AND_DECODE(int64_t); +INSTANTIATE_UNPACK_AND_DECODE(Decimal4Value); +INSTANTIATE_UNPACK_AND_DECODE(Decimal8Value); +INSTANTIATE_UNPACK_AND_DECODE(Decimal16Value); +INSTANTIATE_UNPACK_AND_DECODE(StringValue); +INSTANTIATE_UNPACK_AND_DECODE(TimestampValue); + +// Required for bit-packing-benchmark.cc. +template +const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in, + int64_t in_bytes, uint32_t* __restrict__ out); +} http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h index 38b39e2..c70b55e 100644 --- a/be/src/util/bit-packing.h +++ b/be/src/util/bit-packing.h @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -#ifndef IMPALA_UTIL_BIT_PACKING_H -#define IMPALA_UTIL_BIT_PACKING_H - -namespace impala { +#pragma once +#include <cstddef> #include <cstdint> - #include <utility> +namespace impala { + /// Utilities for manipulating bit-packed values. Bit-packing is a technique for /// compressing integer values that do not use the full range of the integer type. /// E.g. an array of uint32_t values with range [0, 31] only uses the lower 5 bits @@ -131,5 +130,3 @@ class BitPacking { static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values); }; } - -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h index 6fa31cc..8cebe40 100644 --- a/be/src/util/bit-packing.inline.h +++ b/be/src/util/bit-packing.inline.h @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. -#ifndef IMPALA_UTIL_BIT_PACKING_INLINE_H -#define IMPALA_UTIL_BIT_PACKING_INLINE_H +// This contains all the template implementations for functions defined in bit-packing.h. +// This should be included by files that want to instantiate those templates directly. +// Including this file is not generally necessary - instead the templates should be +// instantiated in bit-packing.cc so that compile times stay manageable. + +#pragma once #include "util/bit-packing.h" @@ -345,6 +349,4 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict return in + BYTES_TO_READ; #pragma pop_macro("DECODE_VALUES_CASE") } -} - -#endif +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h index aa53c52..48f52da 100644 --- a/be/src/util/bit-stream-utils.inline.h +++ b/be/src/util/bit-stream-utils.inline.h @@ -21,7 +21,7 @@ #include "util/bit-stream-utils.h" #include "common/compiler-util.h" -#include "util/bit-packing.inline.h" +#include "util/bit-packing.h" namespace impala { http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index bf40301..cc7ef82 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -23,7 +23,7 @@ #include <boost/unordered_map.hpp> #include "common/compiler-util.h" -#include "exec/parquet-common.h" +#include "exec/parquet/parquet-common.h" #include "gutil/strings/substitute.h" #include "runtime/mem-pool.h" #include "runtime/string-value.h" http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc index ecf24f5..c30c30b 100644 --- a/be/src/util/dict-test.cc +++ b/be/src/util/dict-test.cc @@ -25,6 +25,7 @@ #include "runtime/string-value.inline.h" #include "runtime/timestamp-value.h" #include "testutil/gtest-util.h" +#include "util/bit-packing.inline.h" #include "util/dict-encoding.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index a50838c..8bb7665 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -35,7 +35,7 @@ #include <thrift/transport/TBufferTransports.h> #pragma clang diagnostic pop -#include "exec/parquet-common.h" +#include "exec/parquet/parquet-common.h" #include "runtime/mem-pool.h" #include "util/codec.h" #include "util/rle-encoding.h" http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/rle-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index c52659b..4406e46 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -23,8 +23,9 @@ #include <math.h> #include "testutil/gtest-util.h" -#include "util/rle-encoding.h" +#include "util/bit-packing.inline.h" #include "util/bit-stream-utils.inline.h" +#include "util/rle-encoding.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 768a569..d8d40b9 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -370,6 +370,9 @@ error_codes = ( ("PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY", 121, "Parquet file '$0' column '$1' contains a timestamp with invalid time of day. " "The time of day should be 0 <= and < 24 hour (in nanoseconds)."), + + ("PARQUET_CORRUPT_BOOL_VALUE", 122, "File '$0' is corrupt: error decoding BOOLEAN " + "value with encoding $1 at offset $2"), ) import sys http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test index 47fbc1a..3feb7c2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test @@ -2,5 +2,5 @@ ---- QUERY select * from num_values_def_levels_mismatch ---- CATCH -could not read all def levels for column '_c0' +Could not read definition level, even though metadata states there are 1 values remaining in data page. ====
