http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc new file mode 100644 index 0000000..5f10f62 --- /dev/null +++ b/be/src/exec/parquet-metadata-utils.cc @@ -0,0 +1,647 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/parquet-metadata-utils.h" + +#include <string> +#include <sstream> +#include <vector> + +#include <boost/algorithm/string.hpp> +#include <gutil/strings/substitute.h> + +#include "common/logging.h" +#include "common/status.h" +#include "exec/parquet-common.h" +#include "runtime/runtime-state.h" +#include "util/debug-util.h" + +using std::endl; +using std::string; +using std::stringstream; +using std::vector; +using strings::Substitute; +using boost::algorithm::is_any_of; +using boost::algorithm::split; +using boost::algorithm::token_compress_on; + +namespace impala { + +Status ParquetMetadataUtils::ValidateFileVersion( + const parquet::FileMetaData& file_metadata, const char* filename) { + if (file_metadata.version > PARQUET_CURRENT_VERSION) { + stringstream ss; + ss << "File: " << filename << " is of an unsupported version. " + << "file version: " << file_metadata.version; + return Status(ss.str()); + } + return Status::OK(); +} + +Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename, + int64_t file_length, const parquet::RowGroup& row_group) { + for (int i = 0; i < row_group.columns.size(); ++i) { + const parquet::ColumnChunk& col_chunk = row_group.columns[i]; + int64_t col_start = col_chunk.meta_data.data_page_offset; + // The file format requires that if a dictionary page exists, it be before data pages. + if (col_chunk.meta_data.__isset.dictionary_page_offset) { + if (col_chunk.meta_data.dictionary_page_offset >= col_start) { + stringstream ss; + ss << "File " << filename << ": metadata is corrupt. " + << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset + << ") must come before any data pages (offset=" << col_start << ")."; + return Status(ss.str()); + } + col_start = col_chunk.meta_data.dictionary_page_offset; + } + int64_t col_len = col_chunk.meta_data.total_compressed_size; + int64_t col_end = col_start + col_len; + if (col_end <= 0 || col_end > file_length) { + stringstream ss; + ss << "File " << filename << ": metadata is corrupt. " + << "Column " << i << " has invalid column offsets " + << "(offset=" << col_start << ", size=" << col_len << ", " + << "file_size=" << file_length << ")."; + return Status(ss.str()); + } + } + return Status::OK(); +} + +static bool IsEncodingSupported(parquet::Encoding::type e) { + switch (e) { + case parquet::Encoding::PLAIN: + case parquet::Encoding::PLAIN_DICTIONARY: + case parquet::Encoding::BIT_PACKED: + case parquet::Encoding::RLE: + return true; + default: + return false; + } +} + +Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata, + const char* filename, int row_group_idx, int col_idx, + const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc, + RuntimeState* state) { + const parquet::ColumnChunk& file_data = + file_metadata.row_groups[row_group_idx].columns[col_idx]; + + // Check the encodings are supported. + const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings; + for (int i = 0; i < encodings.size(); ++i) { + if (!IsEncodingSupported(encodings[i])) { + stringstream ss; + ss << "File '" << filename << "' uses an unsupported encoding: " + << PrintEncoding(encodings[i]) << " for column '" << schema_element.name + << "'."; + return Status(ss.str()); + } + } + + // Check the compression is supported. + if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED && + file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY && + file_data.meta_data.codec != parquet::CompressionCodec::GZIP) { + stringstream ss; + ss << "File '" << filename << "' uses an unsupported compression: " + << file_data.meta_data.codec << " for column '" << schema_element.name + << "'."; + return Status(ss.str()); + } + + // Validation after this point is only if col_reader is reading values. + if (slot_desc == NULL) return Status::OK(); + + parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type]; + DCHECK_EQ(type, file_data.meta_data.type) + << "Should have been validated in ResolvePath()"; + + // Check the decimal scale in the file matches the metastore scale and precision. + // We fail the query if the metadata makes it impossible for us to safely read + // the file. If we don't require the metadata, we will fail the query if + // abort_on_error is true, otherwise we will just log a warning. + bool is_converted_type_decimal = schema_element.__isset.converted_type && + schema_element.converted_type == parquet::ConvertedType::DECIMAL; + if (slot_desc->type().type == TYPE_DECIMAL) { + // We require that the scale and byte length be set. + if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) { + stringstream ss; + ss << "File '" << filename << "' column '" << schema_element.name + << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY."; + return Status(ss.str()); + } + + if (!schema_element.__isset.type_length) { + stringstream ss; + ss << "File '" << filename << "' column '" << schema_element.name + << "' does not have type_length set."; + return Status(ss.str()); + } + + int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type()); + if (schema_element.type_length != expected_len) { + stringstream ss; + ss << "File '" << filename << "' column '" << schema_element.name + << "' has an invalid type length. Expecting: " << expected_len + << " len in file: " << schema_element.type_length; + return Status(ss.str()); + } + + if (!schema_element.__isset.scale) { + stringstream ss; + ss << "File '" << filename << "' column '" << schema_element.name + << "' does not have the scale set."; + return Status(ss.str()); + } + + if (schema_element.scale != slot_desc->type().scale) { + // TODO: we could allow a mismatch and do a conversion at this step. + stringstream ss; + ss << "File '" << filename << "' column '" << schema_element.name + << "' has a scale that does not match the table metadata scale." + << " File metadata scale: " << schema_element.scale + << " Table metadata scale: " << slot_desc->type().scale; + return Status(ss.str()); + } + + // The other decimal metadata should be there but we don't need it. + if (!schema_element.__isset.precision) { + ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, + schema_element.name); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } else { + if (schema_element.precision != slot_desc->type().precision) { + // TODO: we could allow a mismatch and do a conversion at this step. + ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name, + schema_element.precision, slot_desc->type().precision); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + } + + if (!is_converted_type_decimal) { + // TODO: is this validation useful? It is not required at all to read the data and + // might only serve to reject otherwise perfectly readable files. + ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename, + schema_element.name); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + } else if (schema_element.__isset.scale || schema_element.__isset.precision || + is_converted_type_decimal) { + ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, + schema_element.name, slot_desc->type().DebugString()); + RETURN_IF_ERROR(state->LogOrReturnError(msg)); + } + return Status::OK(); +} + +ParquetFileVersion::ParquetFileVersion(const string& created_by) { + string created_by_lower = created_by; + std::transform(created_by_lower.begin(), created_by_lower.end(), + created_by_lower.begin(), ::tolower); + is_impala_internal = false; + + vector<string> tokens; + split(tokens, created_by_lower, is_any_of(" "), token_compress_on); + // Boost always creates at least one token + DCHECK_GT(tokens.size(), 0); + application = tokens[0]; + + if (tokens.size() >= 3 && tokens[1] == "version") { + string version_string = tokens[2]; + // Ignore any trailing nodextra characters + int n = version_string.find_first_not_of("0123456789."); + string version_string_trimmed = version_string.substr(0, n); + + vector<string> version_tokens; + split(version_tokens, version_string_trimmed, is_any_of(".")); + version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0; + version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0; + version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0; + + if (application == "impala") { + if (version_string.find("-internal") != string::npos) is_impala_internal = true; + } + } else { + version.major = 0; + version.minor = 0; + version.patch = 0; + } +} + +bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const { + if (version.major < major) return true; + if (version.major > major) return false; + DCHECK_EQ(version.major, major); + if (version.minor < minor) return true; + if (version.minor > minor) return false; + DCHECK_EQ(version.minor, minor); + return version.patch < patch; +} + +bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const { + return version.major == major && version.minor == minor && version.patch == patch; +} + +static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) { + switch (t) { + case parquet::FieldRepetitionType::REQUIRED: return "required"; + case parquet::FieldRepetitionType::OPTIONAL: return "optional"; + case parquet::FieldRepetitionType::REPEATED: return "repeated"; + default: return "<unknown>"; + } +} + +static string PrintParquetType(const parquet::Type::type& t) { + switch (t) { + case parquet::Type::BOOLEAN: return "boolean"; + case parquet::Type::INT32: return "int32"; + case parquet::Type::INT64: return "int64"; + case parquet::Type::INT96: return "int96"; + case parquet::Type::FLOAT: return "float"; + case parquet::Type::DOUBLE: return "double"; + case parquet::Type::BYTE_ARRAY: return "byte_array"; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array"; + default: return "<unknown>"; + } +} + +string SchemaNode::DebugString(int indent) const { + stringstream ss; + for (int i = 0; i < indent; ++i) ss << " "; + ss << PrintRepetitionType(element->repetition_type) << " "; + if (element->num_children > 0) { + ss << "struct"; + } else { + ss << PrintParquetType(element->type); + } + ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level + << " r:" << max_rep_level << "]"; + if (element->num_children > 0) { + ss << " {" << endl; + for (int i = 0; i < element->num_children; ++i) { + ss << children[i].DebugString(indent + 2) << endl; + } + for (int i = 0; i < indent; ++i) ss << " "; + ss << "}"; + } + return ss.str(); +} + +Status ParquetSchemaResolver::CreateSchemaTree(const vector<parquet::SchemaElement>& schema, + SchemaNode* node) const { + int idx = 0; + int col_idx = 0; + RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node)); + if (node->children.empty()) { + return Status(Substitute("Invalid file: '$0' has no columns.", filename_)); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::CreateSchemaTree( + const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level, + int ira_def_level, int* idx, int* col_idx, SchemaNode* node) + const { + if (*idx >= schema.size()) { + return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from " + "flattened schema in file metadata", filename_)); + } + node->element = &schema[*idx]; + ++(*idx); + + if (node->element->num_children == 0) { + // node is a leaf node, meaning it's materialized in the file and appears in + // file_metadata_.row_groups.columns + node->col_idx = *col_idx; + ++(*col_idx); + } + + // def_level_of_immediate_repeated_ancestor does not include this node, so set before + // updating ira_def_level + node->def_level_of_immediate_repeated_ancestor = ira_def_level; + + if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) { + ++max_def_level; + } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED) { + ++max_rep_level; + // Repeated fields add a definition level. This is used to distinguish between an + // empty list and a list with an item in it. + ++max_def_level; + // node is the new most immediate repeated ancestor + ira_def_level = max_def_level; + } + node->max_def_level = max_def_level; + node->max_rep_level = max_rep_level; + + node->children.resize(node->element->num_children); + for (int i = 0; i < node->element->num_children; ++i) { + RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level, + idx, col_idx, &node->children[i])); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node, + bool* pos_field, bool* missing_field) const { + *missing_field = false; + // First try two-level array encoding. + bool missing_field_two_level; + Status status_two_level = + ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level); + if (missing_field_two_level) DCHECK(status_two_level.ok()); + if (status_two_level.ok() && !missing_field_two_level) return Status::OK(); + // The two-level resolution failed or reported a missing field, try three-level array + // encoding. + bool missing_field_three_level; + Status status_three_level = + ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level); + if (missing_field_three_level) DCHECK(status_three_level.ok()); + if (status_three_level.ok() && !missing_field_three_level) return Status::OK(); + // The three-level resolution failed or reported a missing field, try one-level array + // encoding. + bool missing_field_one_level; + Status status_one_level = + ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level); + if (missing_field_one_level) DCHECK(status_one_level.ok()); + if (status_one_level.ok() && !missing_field_one_level) return Status::OK(); + // None of resolutions yielded a node. Set *missing_field to true if any of the + // resolutions reported a missing a field. + if (missing_field_one_level || missing_field_two_level || missing_field_three_level) { + *node = NULL; + *missing_field = true; + return Status::OK(); + } + // All resolutions failed. Log and return the status from the three-level resolution + // (which is technically the standard). + DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok()); + *node = NULL; + VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace(); + return status_three_level; +} + +Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding, + const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field) const { + DCHECK(schema_.element != NULL) + << "schema_ must be initialized before calling ResolvePath()"; + + *pos_field = false; + *missing_field = false; + *node = const_cast<SchemaNode*>(&schema_); + const ColumnType* col_type = NULL; + + // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by + // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema. + for (int i = 0; i < path.size(); ++i) { + // Advance '*node' if necessary + if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) { + *node = NextSchemaNode(col_type, path, i, *node, missing_field); + if (*missing_field) return Status::OK(); + } else { + // We just resolved an array, meaning *node is set to the repeated field of the + // array. Since we are trying to resolve using one- or two-level array encoding, the + // repeated field represents both the array and the array's item (i.e. there is no + // explict item field), so we don't advance *node in this case. + DCHECK(col_type != NULL); + DCHECK_EQ(col_type->type, TYPE_ARRAY); + DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL); + DCHECK((*node)->is_repeated()); + } + + // Advance 'col_type' + int table_idx = path[i]; + col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type() + : &col_type->children[table_idx]; + + // Resolve path[i] + if (col_type->type == TYPE_ARRAY) { + DCHECK_EQ(col_type->children.size(), 1); + RETURN_IF_ERROR( + ResolveArray(array_encoding, path, i, node, pos_field, missing_field)); + if (*missing_field || *pos_field) return Status::OK(); + } else if (col_type->type == TYPE_MAP) { + DCHECK_EQ(col_type->children.size(), 2); + RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field)); + if (*missing_field) return Status::OK(); + } else if (col_type->type == TYPE_STRUCT) { + DCHECK_GT(col_type->children.size(), 0); + // Nothing to do for structs + } else { + DCHECK(!col_type->IsComplexType()); + DCHECK_EQ(i, path.size() - 1); + RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i)); + } + } + DCHECK(*node != NULL); + return Status::OK(); +} + +SchemaNode* ParquetSchemaResolver::NextSchemaNode( + const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node, + bool* missing_field) const { + DCHECK_LT(next_idx, path.size()); + if (next_idx != 0) DCHECK(col_type != NULL); + + int file_idx; + int table_idx = path[next_idx]; + if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) { + if (next_idx == 0) { + // Resolve top-level table column by name. + DCHECK_LT(table_idx, tbl_desc_.col_descs().size()); + const string& name = tbl_desc_.col_descs()[table_idx].name(); + file_idx = FindChildWithName(node, name); + } else if (col_type->type == TYPE_STRUCT) { + // Resolve struct field by name. + DCHECK_LT(table_idx, col_type->field_names.size()); + const string& name = col_type->field_names[table_idx]; + file_idx = FindChildWithName(node, name); + } else if (col_type->type == TYPE_ARRAY) { + // Arrays have only one child in the file. + DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM); + file_idx = table_idx; + } else { + DCHECK_EQ(col_type->type, TYPE_MAP); + // Maps have two values, "key" and "value". These are supposed to be ordered and may + // not have the right field names, but try to resolve by name in case they're + // switched and otherwise use the order. See + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for + // more details. + DCHECK(table_idx == SchemaPathConstants::MAP_KEY || + table_idx == SchemaPathConstants::MAP_VALUE); + const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value"; + file_idx = FindChildWithName(node, name); + if (file_idx >= node->children.size()) { + // Couldn't resolve by name, fall back to resolution by position. + file_idx = table_idx; + } + } + } else { + // Resolution by position. + DCHECK_EQ(fallback_schema_resolution_, + TParquetFallbackSchemaResolution::type::POSITION); + if (next_idx == 0) { + // For top-level columns, the first index in a path includes the table's partition + // keys. + file_idx = table_idx - tbl_desc_.num_clustering_cols(); + } else { + file_idx = table_idx; + } + } + + if (file_idx >= node->children.size()) { + string schema_resolution_mode = "unknown"; + auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find( + fallback_schema_resolution_); + if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) { + schema_resolution_mode = entry->second; + } + VLOG_FILE << Substitute( + "File '$0' does not contain path '$1' (resolving by $2)", filename_, + PrintPath(tbl_desc_, path), schema_resolution_mode); + *missing_field = true; + return NULL; + } + return &node->children[file_idx]; +} + +int ParquetSchemaResolver::FindChildWithName(SchemaNode* node, + const string& name) const { + int idx; + for (idx = 0; idx < node->children.size(); ++idx) { + if (node->children[idx].element->name == name) break; + } + return idx; +} + +// There are three types of array encodings: +// +// 1. One-level encoding +// A bare repeated field. This is interpreted as a required array of required +// items. +// Example: +// repeated <item-type> item; +// +// 2. Two-level encoding +// A group containing a single repeated field. This is interpreted as a +// <list-repetition> array of required items (<list-repetition> is either +// optional or required). +// Example: +// <list-repetition> group <name> { +// repeated <item-type> item; +// } +// +// 3. Three-level encoding +// The "official" encoding according to the parquet spec. A group containing a +// single repeated group containing the item field. This is interpreted as a +// <list-repetition> array of <item-repetition> items (<list-repetition> and +// <item-repetition> are each either optional or required). +// Example: +// <list-repetition> group <name> { +// repeated group list { +// <item-repetition> <item-type> item; +// } +// } +// +// We ignore any field annotations or names, making us more permissive than the +// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a +// group containing more fields, which corresponds to a complex item type. See +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for +// more details and examples. +// +// This function resolves the array at '*node' assuming one-, two-, or three-level +// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all +// three encodings (unless '*pos_field' or '*missing_field' are set to true). +Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding, + const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field, + bool* missing_field) const { + if (array_encoding == ONE_LEVEL) { + if (!(*node)->is_repeated()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); + return Status::Expected(msg); + } + } else { + // In the multi-level case, we always expect the outer group to contain a single + // repeated field + if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); + return Status::Expected(msg); + } + // Set *node to the repeated field + *node = &(*node)->children[0]; + } + DCHECK((*node)->is_repeated()); + + if (idx + 1 < path.size()) { + if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) { + // The next index in 'path' is the artifical position field. + DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!"; + *pos_field = true; + *node = NULL; + return Status::OK(); + } else { + // The next value in 'path' should be the item index + DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM); + } + } + return Status::OK(); +} + +// According to the parquet spec, map columns are represented like: +// <map-repetition> group <name> (MAP) { +// repeated group key_value { +// required <key-type> key; +// <value-repetition> <value-type> value; +// } +// } +// We ignore any field annotations or names, making us more permissive than the +// Parquet spec dictates. See +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for +// more details. +Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx, SchemaNode** node, + bool* missing_field) const { + if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() || + (*node)->children[0].children.size() != 2) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString()); + return Status::Expected(msg); + } + *node = &(*node)->children[0]; + + // The next index in 'path' should be the key or the value. + if (idx + 1 < path.size()) { + DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY || + path[idx + 1] == SchemaPathConstants::MAP_VALUE); + } + return Status::OK(); +} + +Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node, + const ColumnType& col_type, const SchemaPath& path, int idx) const { + if (!node.children.empty()) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); + return Status::Expected(msg); + } + parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type]; + if (type != node.element->type) { + ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, + PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); + return Status::Expected(msg); + } + return Status::OK(); +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h new file mode 100644 index 0000000..a07627e --- /dev/null +++ b/be/src/exec/parquet-metadata-utils.h @@ -0,0 +1,202 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H +#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H + +#include <string> + +#include "runtime/descriptors.h" +#include "gen-cpp/parquet_types.h" + +namespace impala { + +class RuntimeState; + +class ParquetMetadataUtils { + public: + /// Checks the version of the given file and returns a non-OK status if + /// Impala does not support that version. + static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata, + const char* filename); + + /// Validate column offsets by checking if the dictionary page comes before the data + /// pages and checking if the column offsets lie within the file. + static Status ValidateColumnOffsets(const string& filename, int64_t file_length, + const parquet::RowGroup& row_group); + + /// Validates the column metadata to make sure this column is supported (e.g. encoding, + /// type, etc) and matches the type of given slot_desc. + static Status ValidateColumn(const parquet::FileMetaData& file_metadata, + const char* filename, int row_group_idx, int col_idx, + const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc, + RuntimeState* state); +}; + +struct ParquetFileVersion { + /// Application that wrote the file. e.g. "IMPALA" + std::string application; + + /// Version of the application that wrote the file, expressed in three parts + /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are + /// ignored. e.g.: + /// "1.2.3" => {1, 2, 3} + /// "1.2" => {1, 2, 0} + /// "1.2-cdh5" => {1, 2, 0} + struct { + int major; + int minor; + int patch; + } version; + + /// If true, this file was generated by an Impala internal release + bool is_impala_internal; + + ParquetFileVersion() : is_impala_internal(false) { } + + /// Parses the version from the created_by string + ParquetFileVersion(const std::string& created_by); + + /// Returns true if version is strictly less than <major>.<minor>.<patch> + bool VersionLt(int major, int minor = 0, int patch = 0) const; + + /// Returns true if version is equal to <major>.<minor>.<patch> + bool VersionEq(int major, int minor, int patch) const; +}; + +/// Internal representation of a Parquet schema (including nested-type columns). +struct SchemaNode { + /// The corresponding schema element defined in the file metadata + const parquet::SchemaElement* element; + + /// The index into the RowGroup::columns list if this column is materialized in the + /// file (i.e. it's a scalar type). -1 for nested types. + int col_idx; + + /// The maximum definition level of this column, i.e., the definition level that + /// corresponds to a non-NULL value. Valid values are >= 0. + int max_def_level; + + /// The maximum repetition level of this column. Valid values are >= 0. + int max_rep_level; + + /// The definition level of the most immediate ancestor of this node with repeated + /// field repetition type. 0 if there are no repeated ancestors. + int def_level_of_immediate_repeated_ancestor; + + /// Any nested schema nodes. Empty for non-nested types. + std::vector<SchemaNode> children; + + SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1), + def_level_of_immediate_repeated_ancestor(-1) { } + + std::string DebugString(int indent = 0) const; + + bool is_repeated() const { + return element->repetition_type == parquet::FieldRepetitionType::REPEATED; + } +}; + +/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a +/// Parquet file schema. Supports resolution by field index or by field name. +class ParquetSchemaResolver { + public: + ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc, + TParquetFallbackSchemaResolution::type fallback_schema_resolution) + : tbl_desc_(tbl_desc), + fallback_schema_resolution_(fallback_schema_resolution), + filename_(NULL) { + } + + /// Parses the schema of the given file metadata into an internal schema + /// representation used in path resolution. Remembers the filename for error + /// reporting. Returns a non-OK status if the Parquet schema could not be parsed. + Status Init(const parquet::FileMetaData* file_metadata, const char* filename) { + DCHECK(filename != NULL); + filename_ = filename; + return CreateSchemaTree(file_metadata->schema, &schema_); + } + + /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path' + /// does not exist in this file's schema, 'missing_field' is set to true and + /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path' + /// resolves to a collection position field, *pos_field is set to true. Otherwise + /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved + /// against the file's schema (e.g., unrecognized collection schema). + /// + /// Tries to resolve assuming either two- or three-level array encoding in + /// 'schema_'. Returns a bad status if resolution fails in both cases. + Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field, + bool* missing_field) const; + + private: + /// Unflattens the schema metadata from a Parquet file metadata and converts it to our + /// SchemaNode representation. Returns the result in 'node' unless an error status is + /// returned. Does not set the slot_desc field of any SchemaNode. + Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, + SchemaNode* node) const; + + /// Recursive implementation used internally by the above CreateSchemaTree() function. + Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, + int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx, + SchemaNode* node) const; + + /// The 'array_encoding' parameter determines whether to assume one-, two-, or + /// three-level array encoding. The returned status is not logged (i.e. it's an expected + /// error). + enum ArrayEncoding { + ONE_LEVEL, + TWO_LEVEL, + THREE_LEVEL + }; + + Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path, + SchemaNode** node, bool* pos_field, bool* missing_field) const; + + /// Helper functions for ResolvePathHelper(). + + /// Advances 'node' to one of its children based on path[next_idx] and + /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type + /// associated with 'node'. Returns the child node or sets 'missing_field' to true. + SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path, + int next_idx, SchemaNode* node, bool* missing_field) const; + + /// Returns the index of 'node's child with 'name', or the number of children if not + /// found. + int FindChildWithName(SchemaNode* node, const string& name) const; + + /// The ResolvePathHelper() logic for arrays. + Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx, + SchemaNode** node, bool* pos_field, bool* missing_field) const; + + /// The ResolvePathHelper() logic for maps. + Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node, + bool* missing_field) const; + + /// The ResolvePathHelper() logic for scalars (just does validation since there's no + /// more actual work to be done). + Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type, + const SchemaPath& path, int idx) const; + + const HdfsTableDescriptor& tbl_desc_; + const TParquetFallbackSchemaResolution::type fallback_schema_resolution_; + const char* filename_; + + /// Root node of our internal schema representation populated in Init(). + SchemaNode schema_; +}; + +} // impala namespace + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-scratch-tuple-batch.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-scratch-tuple-batch.h b/be/src/exec/parquet-scratch-tuple-batch.h new file mode 100644 index 0000000..f2f9794 --- /dev/null +++ b/be/src/exec/parquet-scratch-tuple-batch.h @@ -0,0 +1,72 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H +#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H + +#include "runtime/descriptors.h" +#include "runtime/row-batch.h" + +namespace impala { + +/// Helper struct that holds a batch of tuples allocated from a mem pool, as well +/// as state associated with iterating over its tuples and transferring +/// them to an output batch in TransferScratchTuples(). +struct ScratchTupleBatch { + // Memory backing the batch of tuples. Allocated from batch's tuple data pool. + uint8_t* tuple_mem; + // Keeps track of the current tuple index. + int tuple_idx; + // Number of valid tuples in tuple_mem. + int num_tuples; + // Cached for convenient access. + const int tuple_byte_size; + + // Helper batch for safely allocating tuple_mem from its tuple data pool using + // ResizeAndAllocateTupleBuffer(). + RowBatch batch; + + ScratchTupleBatch( + const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker) + : tuple_mem(NULL), + tuple_idx(0), + num_tuples(0), + tuple_byte_size(row_desc.GetRowSize()), + batch(row_desc, batch_size, mem_tracker) { + DCHECK_EQ(row_desc.tuple_descriptors().size(), 1); + } + + Status Reset(RuntimeState* state) { + tuple_idx = 0; + num_tuples = 0; + // Buffer size is not needed. + int64_t buffer_size; + RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem)); + return Status::OK(); + } + + inline Tuple* GetTuple(int tuple_idx) const { + return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size); + } + + inline MemPool* mem_pool() { return batch.tuple_data_pool(); } + inline int capacity() const { return batch.capacity(); } + inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; } + inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; } + inline bool AtEnd() const { return tuple_idx == num_tuples; } +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-version-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-version-test.cc b/be/src/exec/parquet-version-test.cc index c159205..9ead7b6 100644 --- a/be/src/exec/parquet-version-test.cc +++ b/be/src/exec/parquet-version-test.cc @@ -17,7 +17,8 @@ #include <iostream> #include <limits.h> #include <gtest/gtest.h> -#include "exec/hdfs-parquet-scanner.h" +#include "exec/parquet-metadata-utils.h" +#include "util/cpu-info.h" #include "common/names.h" @@ -26,7 +27,7 @@ namespace impala { void CheckVersionParse(const string& s, const string& expected_application, int expected_major, int expected_minor, int expected_patch, bool expected_is_internal) { - HdfsParquetScanner::FileVersion v(s); + ParquetFileVersion v(s); EXPECT_EQ(v.application, expected_application) << "String: " << s; EXPECT_EQ(v.version.major, expected_major) << "String: " << s; EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s; @@ -62,7 +63,7 @@ TEST(ParquetVersionTest, Parsing) { } TEST(ParquetVersionTest, Comparisons) { - HdfsParquetScanner::FileVersion v("foo version 1.2.3"); + ParquetFileVersion v("foo version 1.2.3"); EXPECT_TRUE(v.VersionEq(1, 2, 3)); EXPECT_FALSE(v.VersionEq(1, 2, 4)); EXPECT_TRUE(v.VersionLt(3, 2, 1)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exprs/expr-value.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-value.h b/be/src/exprs/expr-value.h index 93b5f83..2cd2d58 100644 --- a/be/src/exprs/expr-value.h +++ b/be/src/exprs/expr-value.h @@ -17,7 +17,7 @@ #include "runtime/collection-value.h" #include "runtime/decimal-value.h" -#include "runtime/string-value.h" +#include "runtime/string-value.inline.h" #include "runtime/timestamp-value.h" #include "util/decimal-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index afbe3b6..7a467be 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -225,6 +225,20 @@ void RuntimeState::GetUnreportedErrors(ErrorLogMap* new_errors) { ClearErrorMap(error_log_); } +Status RuntimeState::LogOrReturnError(const ErrorMsg& message) { + DCHECK_NE(message.error(), TErrorCode::OK); + // If either abort_on_error=true or the error necessitates execution stops + // immediately, return an error status. + if (abort_on_error() || + message.error() == TErrorCode::MEM_LIMIT_EXCEEDED || + message.error() == TErrorCode::CANCELLED) { + return Status(message); + } + // Otherwise, add the error to the error log and continue. + LogError(message); + return Status::OK(); +} + void RuntimeState::LogMemLimitExceeded(const MemTracker* tracker, int64_t failed_allocation_size) { DCHECK_GE(failed_allocation_size, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index a5e560e..d6c766c 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -209,6 +209,12 @@ class RuntimeState { /// be sent back to the coordinator void GetUnreportedErrors(ErrorLogMap* new_errors); + /// Given an error message, determine whether execution should be aborted and, if so, + /// return the corresponding error status. Otherwise, log the error and return + /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to + /// true or the error is not recoverable and should be handled upstream. + Status LogOrReturnError(const ErrorMsg& message); + bool is_cancelled() const { return is_cancelled_; } void set_is_cancelled(bool v) { is_cancelled_ = v; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index c3f8c94..5ac96e6 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -269,6 +269,14 @@ string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path) { return ss.str(); } +string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path, + int end_path_idx) { + DCHECK_GE(end_path_idx, 0); + SchemaPath::const_iterator subpath_end = path.begin() + end_path_idx + 1; + SchemaPath subpath(path.begin(), subpath_end); + return PrintPath(tbl_desc, subpath); +} + string PrintNumericPath(const SchemaPath& path) { stringstream ss; ss << "["; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index c9550dc..48e8c27 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -70,8 +70,12 @@ std::string PrintAsHex(const char* bytes, int64_t len); std::string PrintTMetricKind(const TMetricKind::type& type); std::string PrintTUnit(const TUnit::type& type); std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type); + /// Returns the fully qualified path, e.g. "database.table.array_col.item.field" std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path); +/// Same as PrintPath(), but truncates the path after the given 'end_path_idx'. +std::string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path, + int end_path_idx); /// Returns the numeric path without column/field names, e.g. "[0,1,2]" std::string PrintNumericPath(const SchemaPath& path); @@ -98,6 +102,20 @@ std::string GetVersionString(bool compact = false); /// for recursive calls. std::string GetStackTrace(); +// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed +// input file. They differentiate these cases from DCHECKs, which indicate conditions that +// are true unless there's a bug in Impala. We would ideally always return a bad Status +// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK instead because +// there's a performance cost to doing the check in a release build, or just due to legacy +// code. +#define FILE_CHECK(a) DCHECK(a) +#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b) +#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b) +#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b) +#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b) +#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b) +#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b) + } #endif
