github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3253037880
########## be/src/format/parquet/parquet_nested_column_utils.cpp: ########## @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_nested_column_utils.h" + +#include <algorithm> +#include <cctype> +#include <string_view> +#include <unordered_map> +#include <utility> + +#include "core/data_type/data_type_nullable.h" +#include "format/parquet/schema_desc.h" + +namespace doris { +namespace { + +enum class NestedPathMode { + NAME, + FIELD_ID, +}; + +void add_column_id_range(const FieldSchema& field_schema, std::set<uint64_t>& column_ids) { + const uint64_t start_id = field_schema.get_column_id(); + const uint64_t max_column_id = field_schema.get_max_column_id(); + for (uint64_t id = start_id; id <= max_column_id; ++id) { + column_ids.insert(id); + } +} + +const FieldSchema* find_child_by_structural_name(const FieldSchema& field_schema, + std::string_view name) { + std::string lower_name(name); + std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), + [](unsigned char c) { return static_cast<char>(std::tolower(c)); }); + for (const auto& child : field_schema.children) { + if (child.name == name || child.lower_case_name == lower_name) { + return &child; + } + } + return nullptr; +} + +const FieldSchema* find_child_by_exact_name(const FieldSchema& field_schema, + std::string_view name) { + for (const auto& child : field_schema.children) { + if (child.name == name) { + return &child; + } + } + return nullptr; +} + +void add_variant_metadata(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + if (const auto* metadata = find_child_by_structural_name(variant_field, "metadata")) { + add_column_id_range(*metadata, column_ids); + } +} + +void add_variant_value(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + add_variant_metadata(variant_field, column_ids); + if (const auto* value = find_child_by_structural_name(variant_field, "value")) { + add_column_id_range(*value, column_ids); + } +} + +struct VariantColumnIdExtractionResult { + bool has_child_columns = false; + bool needs_metadata = false; +}; + +bool is_shredded_variant_field(const FieldSchema& field_schema) { + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field_schema.children) { + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_value) { + return true; + } + if (typed_value == nullptr) { + return false; + } + const auto type = remove_nullable(typed_value->data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +bool add_shredded_variant_field_value(const FieldSchema& shredded_field, + std::set<uint64_t>& column_ids) { + if (const auto* value = find_child_by_structural_name(shredded_field, "value")) { + add_column_id_range(*value, column_ids); + return true; + } + return false; +} + +bool is_variant_array_subscript(std::string_view path) { + return !path.empty() && + std::all_of(path.begin(), path.end(), [](unsigned char c) { return std::isdigit(c); }); +} + +bool is_terminal_variant_meta_component(std::string_view path) { + return path == "NULL" || path == "OFFSET"; +} + +const std::vector<std::string>& effective_variant_path(const std::vector<std::string>& raw_path, + std::vector<std::string>& stripped_path) { + if (!raw_path.empty() && is_terminal_variant_meta_component(raw_path.back())) { + stripped_path.assign(raw_path.begin(), raw_path.end() - 1); + return stripped_path; + } + return raw_path; +} + +bool contains_inherited_metadata_value(const FieldSchema& field_schema) { + if (is_shredded_variant_field(field_schema) && + find_child_by_structural_name(field_schema, "value") != nullptr) { + return true; + } + return std::any_of( + field_schema.children.begin(), field_schema.children.end(), + [](const FieldSchema& child) { return contains_inherited_metadata_value(child); }); +} + +VariantColumnIdExtractionResult extract_variant_typed_nested_column_ids( + const FieldSchema& field_schema, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids); + +VariantColumnIdExtractionResult extract_shredded_variant_field_ids( + const FieldSchema& shredded_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids) { + const auto* typed_value = find_child_by_structural_name(shredded_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(shredded_field, column_ids); + result.has_child_columns = true; + result.needs_metadata |= contains_inherited_metadata_value(shredded_field); + continue; + } + + bool has_selected_columns = add_shredded_variant_field_value(shredded_field, column_ids); + result.needs_metadata |= has_selected_columns; + if (typed_value != nullptr) { + const auto typed_value_type = remove_nullable(typed_value->data_type); + if (typed_value_type->get_primitive_type() != TYPE_STRUCT) { + auto child_result = + extract_variant_typed_nested_column_ids(*typed_value, {path}, column_ids); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } else if (const auto* typed_child = find_child_by_exact_name(*typed_value, path[0])) { + if (path.size() == 1) { + add_column_id_range(*typed_child, column_ids); + result.needs_metadata |= contains_inherited_metadata_value(*typed_child); + column_ids.insert(typed_value->get_column_id()); + has_selected_columns = true; + } else { + std::vector<std::vector<std::string>> child_paths { + std::vector<std::string>(path.begin() + 1, path.end())}; + auto child_result = extract_variant_typed_nested_column_ids( + *typed_child, child_paths, column_ids); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } + } + } + result.has_child_columns |= has_selected_columns; + } + + if (result.has_child_columns) { + column_ids.insert(shredded_field.get_column_id()); + } + return result; +} + +VariantColumnIdExtractionResult extract_variant_nested_column_ids( + const FieldSchema& variant_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids) { + const auto* typed_value = find_child_by_structural_name(variant_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(variant_field, column_ids); + result.has_child_columns = true; + continue; + } + + VariantColumnIdExtractionResult typed_result; + if (typed_value != nullptr) { + if (const auto* typed_child = find_child_by_exact_name(*typed_value, path[0])) { + if (path.size() == 1) { + add_column_id_range(*typed_child, column_ids); + typed_result.has_child_columns = true; + typed_result.needs_metadata = contains_inherited_metadata_value(*typed_child); + } else { Review Comment: Top-level typed VARIANT arrays are not pruned correctly. For a typed-only root layout like `v (VARIANT) { metadata, typed_value: array<struct<n:int>> }` and an access such as `v[1]['n']`, `path[0]` is the literal subscript (`"1"`) while the `typed_value` child is the LIST element, so this exact child lookup fails and line 248 falls back to `v.value`. Typed-only files have no top-level `value`, so the selected `v.typed_value.list.element.n` leaf is missed. The existing array comments cover arrays after entering a shredded field; this is the distinct root-typed-array path. Please route non-struct `typed_value` through `extract_variant_typed_nested_column_ids()` the same way the shredded-field helper does, with coverage for root typed-only VARIANT arrays. ########## be/src/format/parquet/vparquet_reader.cpp: ########## @@ -1025,8 +1137,12 @@ bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup& row_gro Review Comment: This complex-type dispatch does not unwrap nullable types, so an optional top-level VARIANT can still fall into the scalar prefetch branch below. `parse_variant_field()` wraps optional VARIANT as `Nullable(Variant)`, making `get_primitive_type()` return `TYPE_NULLABLE`; `_generate_random_access_ranges()` then uses the group field's `physical_column_index` (`-1`) at `row_group.columns[...]` before the row-group reader is even created. This is distinct from the existing `VariantColumnReader::init()` nullable-struct thread because the crash can happen during random-access range generation. Please dispatch on `remove_nullable(field->data_type)` here and add optional top-level VARIANT coverage with prefetch enabled. ########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,844 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <deque> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +#include "core/column/column_variant.h" +#include "core/value/jsonb_value.h" +#include "exec/common/variant_util.h" + +namespace doris::parquet { + +std::string format_variant_uuid(const uint8_t* ptr) { + static constexpr char hex[] = "0123456789abcdef"; + std::string uuid; + uuid.reserve(36); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid.push_back('-'); + } + uuid.push_back(hex[ptr[i] >> 4]); + uuid.push_back(hex[ptr[i] & 0x0f]); + } + return uuid; +} + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +struct VariantObjectLayout { + std::vector<uint64_t> field_ids; + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + if (size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +bool is_valid_utf8(std::string_view value) { + const auto* data = reinterpret_cast<const uint8_t*>(value.data()); + const auto* end = data + value.size(); + while (data < end) { + const uint8_t first = *data++; + if (first <= 0x7f) { + continue; + } + + uint32_t code_point = 0; + size_t continuation_bytes = 0; + if (first >= 0xc2 && first <= 0xdf) { + code_point = first & 0x1f; + continuation_bytes = 1; + } else if (first >= 0xe0 && first <= 0xef) { + code_point = first & 0x0f; + continuation_bytes = 2; + } else if (first >= 0xf0 && first <= 0xf4) { + code_point = first & 0x07; + continuation_bytes = 3; + } else { + return false; + } + + if (static_cast<size_t>(end - data) < continuation_bytes) { + return false; + } + for (size_t i = 0; i < continuation_bytes; ++i) { + const uint8_t byte = *data++; + if ((byte & 0xc0) != 0x80) { + return false; + } + code_point = (code_point << 6) | (byte & 0x3f); + } + + if ((continuation_bytes == 2 && code_point < 0x800) || + (continuation_bytes == 3 && code_point < 0x10000) || + (code_point >= 0xd800 && code_point <= 0xdfff) || code_point > 0x10ffff) { + return false; + } + } + return true; +} + +Status require_valid_utf8(std::string_view value, std::string_view context) { + if (!is_valid_utf8(value)) { + return Status::Corruption("Invalid Parquet VARIANT {} UTF-8 string", context); + } + return Status::OK(); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +Status append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point value is not supported"); + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); + return Status::OK(); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const uint8_t* end, + std::string* json, const uint8_t** next); +Status decode_value(const uint8_t* ptr, const uint8_t* end, const VariantMetadata& metadata, + std::string* json, const uint8_t** next); + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + json->push_back('"'); + json->append(format_variant_uuid(ptr)); + json->push_back('"'); +} + +Status make_jsonb_field(std::string_view json, FieldWithDataType* value) { + JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + value->field = + Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), jsonb_value.size())); + value->base_scalar_type_id = TYPE_JSONB; + return Status::OK(); +} + +Status insert_empty_object_marker(const PathInData& path, VariantMap* values) { + FieldWithDataType value; + RETURN_IF_ERROR(make_jsonb_field("{}", &value)); + (*values)[path] = std::move(value); + return Status::OK(); +} + +Status parse_json_to_variant_map(std::string_view json, const PathInData& prefix, + VariantMap* values) { + auto parsed_column = ColumnVariant::create(0, false); + ParseConfig parse_config; + StringRef json_ref(json.data(), json.size()); + RETURN_IF_CATCH_EXCEPTION( + variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config)); + Field parsed = (*parsed_column)[0]; + if (parsed.is_null()) { + return Status::OK(); + } + + PathInDataBuilder path; + path.append(prefix.get_parts(), false); + for (auto& [parsed_path, value] : parsed.get<TYPE_VARIANT>()) { + path.append(parsed_path.get_parts(), false); + (*values)[path.build()] = std::move(value); + for (size_t i = 0; i < parsed_path.get_parts().size(); ++i) { + path.pop_back(); + } + } + return Status::OK(); +} + +Status read_object_layout(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, VariantObjectLayout* layout) { + int field_offset_size = (value_header & 0x03) + 1; + int field_id_size = ((value_header >> 2) & 0x03) + 1; + int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR( + require_available_entries(ptr, end, num_elements, field_id_size, "object field ids")); + layout->field_ids.resize(num_elements); + for (uint64_t i = 0; i < num_elements; ++i) { + layout->field_ids[i] = read_unsigned_le(ptr, field_id_size); + ptr += field_id_size; + if (layout->field_ids[i] >= metadata.dictionary.size()) { + return Status::Corruption("Invalid Parquet VARIANT object field id {}", + layout->field_ids[i]); + } + if (i > 0 && !variant_string_less(metadata.dictionary[layout->field_ids[i - 1]], + metadata.dictionary[layout->field_ids[i]])) { + return Status::Corruption("Invalid Parquet VARIANT object field names"); + } + } + + RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, field_offset_size, + "object field offsets")); + layout->field_offsets.resize(num_elements + 1); + for (uint64_t i = 0; i <= num_elements; ++i) { + layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size); + ptr += field_offset_size; + } + + layout->total_size = layout->field_offsets.back(); + layout->fields = ptr; + RETURN_IF_ERROR( + require_available(layout->fields, end, layout->total_size, "object field values")); + RETURN_IF_ERROR( + validate_array_field_offsets(layout->field_offsets, layout->total_size, "object")); + return Status::OK(); +} + +Status decode_value_to_variant_map(const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, PathInDataBuilder* path, + VariantMap* values, std::deque<std::string>* string_values, + const uint8_t** next); + +Status decode_primitive_to_variant_map(uint8_t primitive_header, const uint8_t* ptr, + const uint8_t* end, const VariantMetadata&, + PathInDataBuilder* path, VariantMap* values, + std::deque<std::string>* string_values, + const uint8_t** next) { + if (primitive_header == 15) { + RETURN_IF_ERROR(require_available(ptr, end, 4, "binary length")); + uint64_t size = read_unsigned_le(ptr, 4); + ptr += 4; + RETURN_IF_ERROR(require_available(ptr, end, size, "binary value")); + string_values->emplace_back(reinterpret_cast<const char*>(ptr), static_cast<size_t>(size)); + (*values)[path->build()] = FieldWithDataType { + .field = Field::create_field<TYPE_VARBINARY>(StringView(string_values->back())), + .base_scalar_type_id = TYPE_VARBINARY}; + *next = ptr + size; + return Status::OK(); + } + + std::string json; + RETURN_IF_ERROR(decode_primitive(primitive_header, ptr, end, &json, next)); + return parse_json_to_variant_map(json, path->build(), values); +} + +Status decode_object_to_variant_map(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, PathInDataBuilder* path, + VariantMap* values, std::deque<std::string>* string_values, + const uint8_t** next) { + VariantObjectLayout layout; + RETURN_IF_ERROR(read_object_layout(value_header, ptr, end, metadata, &layout)); + + if (layout.field_ids.empty()) { + RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values)); + } + + for (uint64_t i = 0; i < layout.field_ids.size(); ++i) { + const uint8_t* child_begin = layout.fields + layout.field_offsets[i]; + const uint8_t* child_end = layout.fields + layout.field_offsets[i + 1]; + const uint8_t* child_next = nullptr; + path->append(metadata.dictionary[layout.field_ids[i]], false); + RETURN_IF_ERROR(decode_value_to_variant_map(child_begin, child_end, metadata, path, values, + string_values, &child_next)); + path->pop_back(); + if (child_next != child_end) { + return Status::Corruption("Invalid Parquet VARIANT object child value length"); + } + } + *next = layout.fields + layout.total_size; + return Status::OK(); +} + +Status decode_value_to_variant_map(const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, PathInDataBuilder* path, + VariantMap* values, std::deque<std::string>* string_values, + const uint8_t** next) { + RETURN_IF_ERROR(require_available(ptr, end, 1, "value")); + uint8_t value_metadata = *ptr++; + uint8_t basic_type = value_metadata & 0x03; + uint8_t value_header = value_metadata >> 2; + + switch (basic_type) { + case 0: + return decode_primitive_to_variant_map(value_header, ptr, end, metadata, path, values, + string_values, next); + case 2: + return decode_object_to_variant_map(value_header, ptr, end, metadata, path, values, + string_values, next); + case 1: + case 3: { + std::string json; + RETURN_IF_ERROR(decode_value(ptr - 1, end, metadata, &json, next)); + return parse_json_to_variant_map(json, path->build(), values); + } + default: + return Status::Corruption("Unsupported Parquet VARIANT basic type {}", basic_type); + } Review Comment: Arrays containing residual binary primitives still fail on the `VariantMap` path. Object children use `decode_primitive_to_variant_map()` and preserve primitive header 15 as `TYPE_VARBINARY`, but arrays go through `decode_value()` and JSON reparsing; `decode_primitive()` returns `NotSupported` for the same binary primitive. A valid residual value such as `[binary(0xff)]` therefore fails or cannot be materialized even though `{"b": binary(0xff)}` works. Please decode arrays directly into `VariantMap` (or add binary-aware array handling) and cover residual arrays containing binary values. ########## be/src/format/parquet/vparquet_column_reader.cpp: ########## @@ -103,6 +127,1395 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset } } +static constexpr int64_t UNIX_EPOCH_DAYNR = 719528; +static constexpr int64_t MICROS_PER_SECOND = 1000000; + +static int64_t variant_date_value(const VecDateTimeValue& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_datetime_value(const VecDateTimeValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND; +} + +static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int64_t variant_datetime_value(const TimestampTzValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int find_child_idx(const FieldSchema& field, std::string_view name) { + for (int i = 0; i < field.children.size(); ++i) { + if (field.children[i].lower_case_name == name) { + return i; + } + } + return -1; +} + +static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) { + auto type = remove_nullable(field.data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +static bool is_variant_wrapper_field(const FieldSchema& field, + bool allow_scalar_typed_value_only_wrapper, + bool allow_value_only_wrapper = false) { + auto type = remove_nullable(field.data_type); + if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) { + return false; + } + + bool has_metadata = false; + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field.children) { + if (child.lower_case_name == "metadata") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_metadata = true; + continue; + } + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_metadata && has_value) { + return type->get_primitive_type() == TYPE_VARIANT || typed_value != nullptr; + } + if (has_value) { + return typed_value != nullptr || allow_value_only_wrapper; + } + return typed_value != nullptr && (allow_scalar_typed_value_only_wrapper || + is_variant_wrapper_typed_value_child(*typed_value)); +} Review Comment: Allowing any value-only group to be treated as a residual wrapper makes ordinary typed objects ambiguous. A valid typed-only object like `typed_value.obj { optional binary value; }`, representing `{"obj": {"value": "user-value"}}`, satisfies `has_value` with no `typed_value`; when row-wise reconstruction calls `is_variant_wrapper_field(..., allow_value_only_wrapper=true)`, those user bytes are decoded as Parquet VARIANT residual bytes instead of emitted as `obj.value`. The existing thread covered user fields named both `metadata` and `value`; this is the new value-only false positive introduced for residual-only wrappers. Please distinguish real shredded residual wrappers from ordinary user structs, or avoid classifying a lone user `value` field as a wrapper, with coverage for a typed-only object whose only field is named `value`. ########## be/src/format/parquet/parquet_nested_column_utils.cpp: ########## @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_nested_column_utils.h" + +#include <algorithm> +#include <cctype> +#include <string_view> +#include <unordered_map> +#include <utility> + +#include "core/data_type/data_type_nullable.h" +#include "format/parquet/schema_desc.h" + +namespace doris { +namespace { + +enum class NestedPathMode { + NAME, + FIELD_ID, +}; + +void add_column_id_range(const FieldSchema& field_schema, std::set<uint64_t>& column_ids) { + const uint64_t start_id = field_schema.get_column_id(); + const uint64_t max_column_id = field_schema.get_max_column_id(); + for (uint64_t id = start_id; id <= max_column_id; ++id) { + column_ids.insert(id); + } +} + +const FieldSchema* find_child_by_structural_name(const FieldSchema& field_schema, + std::string_view name) { + std::string lower_name(name); + std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), + [](unsigned char c) { return static_cast<char>(std::tolower(c)); }); + for (const auto& child : field_schema.children) { + if (child.name == name || child.lower_case_name == lower_name) { + return &child; + } + } + return nullptr; +} + +const FieldSchema* find_child_by_exact_name(const FieldSchema& field_schema, + std::string_view name) { + for (const auto& child : field_schema.children) { + if (child.name == name) { + return &child; + } + } + return nullptr; +} + +void add_variant_metadata(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + if (const auto* metadata = find_child_by_structural_name(variant_field, "metadata")) { + add_column_id_range(*metadata, column_ids); + } +} + +void add_variant_value(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + add_variant_metadata(variant_field, column_ids); + if (const auto* value = find_child_by_structural_name(variant_field, "value")) { + add_column_id_range(*value, column_ids); + } +} + +struct VariantColumnIdExtractionResult { + bool has_child_columns = false; + bool needs_metadata = false; +}; + +bool is_shredded_variant_field(const FieldSchema& field_schema) { + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field_schema.children) { + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_value) { + return true; + } + if (typed_value == nullptr) { + return false; + } + const auto type = remove_nullable(typed_value->data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +bool add_shredded_variant_field_value(const FieldSchema& shredded_field, + std::set<uint64_t>& column_ids) { + if (const auto* value = find_child_by_structural_name(shredded_field, "value")) { + add_column_id_range(*value, column_ids); + return true; + } + return false; +} + +bool is_variant_array_subscript(std::string_view path) { + return !path.empty() && + std::all_of(path.begin(), path.end(), [](unsigned char c) { return std::isdigit(c); }); +} + +bool is_terminal_variant_meta_component(std::string_view path) { + return path == "NULL" || path == "OFFSET"; +} + +const std::vector<std::string>& effective_variant_path(const std::vector<std::string>& raw_path, + std::vector<std::string>& stripped_path) { + if (!raw_path.empty() && is_terminal_variant_meta_component(raw_path.back())) { + stripped_path.assign(raw_path.begin(), raw_path.end() - 1); + return stripped_path; + } + return raw_path; +} + +bool contains_inherited_metadata_value(const FieldSchema& field_schema) { + if (is_shredded_variant_field(field_schema) && + find_child_by_structural_name(field_schema, "value") != nullptr) { + return true; + } + return std::any_of( + field_schema.children.begin(), field_schema.children.end(), + [](const FieldSchema& child) { return contains_inherited_metadata_value(child); }); +} + +VariantColumnIdExtractionResult extract_variant_typed_nested_column_ids( + const FieldSchema& field_schema, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids); + +VariantColumnIdExtractionResult extract_shredded_variant_field_ids( + const FieldSchema& shredded_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids) { + const auto* typed_value = find_child_by_structural_name(shredded_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(shredded_field, column_ids); + result.has_child_columns = true; + result.needs_metadata |= contains_inherited_metadata_value(shredded_field); + continue; + } + + bool has_selected_columns = add_shredded_variant_field_value(shredded_field, column_ids); + result.needs_metadata |= has_selected_columns; + if (typed_value != nullptr) { + const auto typed_value_type = remove_nullable(typed_value->data_type); + if (typed_value_type->get_primitive_type() != TYPE_STRUCT) { + auto child_result = + extract_variant_typed_nested_column_ids(*typed_value, {path}, column_ids); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } else if (const auto* typed_child = find_child_by_exact_name(*typed_value, path[0])) { + if (path.size() == 1) { + add_column_id_range(*typed_child, column_ids); + result.needs_metadata |= contains_inherited_metadata_value(*typed_child); + column_ids.insert(typed_value->get_column_id()); + has_selected_columns = true; + } else { + std::vector<std::vector<std::string>> child_paths { + std::vector<std::string>(path.begin() + 1, path.end())}; + auto child_result = extract_variant_typed_nested_column_ids( + *typed_child, child_paths, column_ids); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } + } + } + result.has_child_columns |= has_selected_columns; + } + + if (result.has_child_columns) { + column_ids.insert(shredded_field.get_column_id()); + } + return result; +} + +VariantColumnIdExtractionResult extract_variant_nested_column_ids( + const FieldSchema& variant_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids) { + const auto* typed_value = find_child_by_structural_name(variant_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(variant_field, column_ids); + result.has_child_columns = true; + continue; + } + + VariantColumnIdExtractionResult typed_result; + if (typed_value != nullptr) { + if (const auto* typed_child = find_child_by_exact_name(*typed_value, path[0])) { + if (path.size() == 1) { + add_column_id_range(*typed_child, column_ids); + typed_result.has_child_columns = true; + typed_result.needs_metadata = contains_inherited_metadata_value(*typed_child); + } else { + std::vector<std::vector<std::string>> child_paths { + std::vector<std::string>(path.begin() + 1, path.end())}; + typed_result = extract_variant_typed_nested_column_ids(*typed_child, + child_paths, column_ids); + } + if (typed_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + if (typed_result.needs_metadata) { + add_variant_metadata(variant_field, column_ids); + } + } + } + } + + if (!typed_result.has_child_columns) { + add_variant_value(variant_field, column_ids); + } + result.has_child_columns = true; + } + + if (result.has_child_columns) { + column_ids.insert(variant_field.get_column_id()); + } + return result; +} + +VariantColumnIdExtractionResult extract_variant_typed_nested_column_ids( + const FieldSchema& field_schema, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids) { + if (field_schema.data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { + return extract_variant_nested_column_ids(field_schema, paths, column_ids); + } + if (is_shredded_variant_field(field_schema)) { + return extract_shredded_variant_field_ids(field_schema, paths, column_ids); + } + + VariantColumnIdExtractionResult result; + std::unordered_map<std::string, std::vector<std::vector<std::string>>> child_paths_by_name; + for (const auto& path : paths) { + if (path.empty()) { + add_column_id_range(field_schema, column_ids); + result.has_child_columns = true; + result.needs_metadata |= contains_inherited_metadata_value(field_schema); + continue; + } + std::vector<std::string> remaining; + if (path.size() > 1) { + remaining.assign(path.begin() + 1, path.end()); + } + const bool is_list = + field_schema.data_type->get_primitive_type() == PrimitiveType::TYPE_ARRAY; + const std::string child_key = + is_list && is_variant_array_subscript(path[0]) ? "*" : path[0]; + child_paths_by_name[child_key].push_back(std::move(remaining)); + } + + for (uint64_t i = 0; i < field_schema.children.size(); ++i) { + const auto& child = field_schema.children[i]; + std::string child_name; + + const bool is_list = + field_schema.data_type->get_primitive_type() == PrimitiveType::TYPE_ARRAY; + const bool is_map = field_schema.data_type->get_primitive_type() == PrimitiveType::TYPE_MAP; + if (is_list) { + child_name = "*"; + } else if (is_map) { + if (i == 0) { + child_name = "KEYS"; + } else { + child_name = i == 1 ? "VALUES" : ""; + } Review Comment: Typed MAP shards under VARIANT paths are not reachable from normal VARIANT key access. If a shredded field is typed as `map<string, struct<n:int>>`, FE will send a literal path like `[attrs, k, n]`; after recursion reaches the MAP, this code only searches for synthetic `KEYS`/`VALUES` children and never maps the literal key `k` to a map-element lookup. The caller then falls back to residual/top-level value and a typed-only MAP shard returns missing data. This is separate from the existing LIST subscript threads. Please normalize VARIANT literal-key access on typed MAP values to read keys plus the corresponding values subtree, and add typed-only MAP pruning coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
