github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3252894731
########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,576 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +namespace doris::parquet { + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end || size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +Status append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point value is not supported"); + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); + return Status::OK(); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + static constexpr char hex[] = "0123456789abcdef"; + json->push_back('"'); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + json->push_back('-'); + } + json->push_back(hex[ptr[i] >> 4]); + json->push_back(hex[ptr[i] & 0x0f]); + } + json->push_back('"'); +} + +Status decode_metadata(const StringRef& metadata, VariantMetadata* result) { + const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data); + const auto* end = ptr + metadata.size; + RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata")); + uint8_t header = *ptr++; Review Comment: The metadata header accepts reserved bit 5 silently. The current grammar only defines the low 4 version bits, bit 4 for `sorted_strings`, and bits 6-7 for offset width, so a header such as `0x21` is not a valid version-1 encoding but is decoded as normal metadata. Please reject headers with reserved bits set and add a malformed-metadata unit case. ########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,576 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +namespace doris::parquet { + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end || size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { Review Comment: `append_json_string()` escapes control characters but otherwise copies arbitrary bytes into the JSON output. Parquet VARIANT metadata keys and string values are UTF-8 strings, so a corrupt payload containing bytes like `0xff` in a dictionary key or short-string value is currently accepted and returned as invalid JSON bytes instead of corruption. Please validate UTF-8 before appending these strings (or reject invalid metadata/string payloads at decode time) and add malformed VARIANT coverage. ########## be/src/format/parquet/vparquet_reader.cpp: ########## @@ -432,11 +440,111 @@ Status ParquetReader::on_before_init_reader(ReaderInitContext* ctx) { RETURN_IF_ERROR(get_file_metadata_schema(&field_desc)); RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name( ctx->tuple_descriptor, *field_desc, ctx->table_info_node)); + auto column_id_result = _create_column_ids_by_name(field_desc, ctx->tuple_descriptor); + ctx->column_ids = std::move(column_id_result.column_ids); + ctx->filter_column_ids = std::move(column_id_result.filter_column_ids); } return Status::OK(); } +ColumnIdResult ParquetReader::_create_column_ids_by_name(const FieldDescriptor* field_desc, + const TupleDescriptor* tuple_descriptor) { + auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc); + mutable_field_desc->assign_ids(); + + std::unordered_map<std::string, const FieldSchema*> table_col_name_to_field_schema_map; + for (int i = 0; i < field_desc->size(); ++i) { + auto field_schema = field_desc->get_column(i); + if (!field_schema) { + continue; + } + table_col_name_to_field_schema_map[field_schema->lower_case_name] = field_schema; + } + + std::set<uint64_t> column_ids; + std::set<uint64_t> filter_column_ids; + + auto process_access_paths = [](const FieldSchema* parquet_field, + const std::vector<TColumnAccessPath>& access_paths, + std::set<uint64_t>& out_ids) { + process_nested_access_paths( + parquet_field, access_paths, out_ids, + [](const FieldSchema* field) { return field->get_column_id(); }, + [](const FieldSchema* field) { return field->get_max_column_id(); }, + HiveParquetNestedColumnUtils::extract_nested_column_ids); + }; + + for (const auto* slot : tuple_descriptor->slots()) { + auto it = table_col_name_to_field_schema_map.find(slot->col_name_lower_case()); + if (it == table_col_name_to_field_schema_map.end()) { + continue; + } + auto field_schema = it->second; + + if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && + slot->col_type() != TYPE_MAP && slot->col_type() != TYPE_VARIANT)) { + column_ids.insert(field_schema->column_id); + if (slot->is_predicate()) { + filter_column_ids.insert(field_schema->column_id); + } Review Comment: Full VARIANT projection is still pruned when the same slot also has a nested access path. The full-root fallback in `process_nested_access_paths()` only fires when `all_access_paths()` is empty or contains a top-level-only path, but FE does not add a root path for a plain projection of `v`; if a predicate or expression also references `v['metric']`, this vector contains only the nested path and the reader selects just that shard. A query like `SELECT cast(v AS string) FROM local(...) WHERE cast(v['metric'] AS bigint) > 0` can therefore reconstruct a partial VARIANT with sibling fields pruned. The same pattern is present in the Hive and Iceberg reader column-id builders. Please ensure a slot that needs the full column keeps the full field range even when nested predicate/access paths are also present, and add coverage for full `v` projection plus a nested predicate/subpath. ########## be/src/format/parquet/schema_desc.cpp: ########## @@ -446,6 +463,39 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem return Status::OK(); } +Status FieldDescriptor::parse_variant_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* variant_field) { + RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, variant_field)); + Review Comment: `parse_variant_field()` accepts extra children under a `VARIANT` annotated group and the reader later ignores them. A malformed schema like `VARIANT { required binary metadata; optional binary value; optional int32 extra; }` passes this validation, but `variant_to_variant_map()` only consumes `metadata`, `value`, and `typed_value`, silently dropping `extra`. Please reject unrecognized top-level VARIANT children so malformed files fail instead of losing data. ########## be/src/format/parquet/vparquet_column_reader.cpp: ########## @@ -103,6 +123,1126 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset } } +static constexpr int64_t UNIX_EPOCH_DAYNR = 719528; +static constexpr int64_t MICROS_PER_SECOND = 1000000; + +static int64_t variant_date_value(const VecDateTimeValue& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_datetime_value(const VecDateTimeValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND; +} + +static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int64_t variant_datetime_value(const TimestampTzValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int find_child_idx(const FieldSchema& field, std::string_view name) { + for (int i = 0; i < field.children.size(); ++i) { + if (field.children[i].lower_case_name == name) { + return i; + } + } + return -1; +} + +static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) { + auto type = remove_nullable(field.data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +static bool is_variant_wrapper_field(const FieldSchema& field, + bool allow_scalar_typed_value_only_wrapper, + bool allow_value_only_wrapper = false) { + auto type = remove_nullable(field.data_type); + if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) { + return false; + } + + bool has_metadata = false; + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field.children) { + if (child.lower_case_name == "metadata") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_metadata = true; + continue; + } + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_metadata && has_value) { + return type->get_primitive_type() == TYPE_VARIANT || typed_value != nullptr; + } + if (has_value) { + return typed_value != nullptr || allow_value_only_wrapper; + } + return typed_value != nullptr && (allow_scalar_typed_value_only_wrapper || + is_variant_wrapper_typed_value_child(*typed_value)); +} + +static Status get_binary_field(const Field& field, std::string* value, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + *present = true; + switch (field.get_type()) { + case TYPE_STRING: + *value = field.get<TYPE_STRING>(); + return Status::OK(); + case TYPE_CHAR: + *value = field.get<TYPE_CHAR>(); + return Status::OK(); + case TYPE_VARCHAR: + *value = field.get<TYPE_VARCHAR>(); + return Status::OK(); + case TYPE_VARBINARY: { + auto ref = field.get<TYPE_VARBINARY>().to_string_ref(); + value->assign(ref.data, ref.size); + return Status::OK(); + } + default: + return Status::Corruption("Parquet VARIANT binary field has unexpected Doris type {}", + field.get_type_name()); + } +} + +static PathInData append_path(const PathInData& prefix, const PathInData& suffix) { + if (prefix.empty()) { + return suffix; + } + if (suffix.empty()) { + return prefix; + } + PathInDataBuilder builder; + builder.append(prefix.get_parts(), false); + builder.append(suffix.get_parts(), false); + return builder.build(); +} + +static Status insert_empty_object_marker(const PathInData& path, VariantMap* values) { + JsonBinaryValue empty_object; + RETURN_IF_ERROR(empty_object.from_json_string("{}")); + (*values)[path] = FieldWithDataType {.field = Field::create_field<TYPE_JSONB>(JsonbField( + empty_object.value(), empty_object.size())), + .base_scalar_type_id = TYPE_JSONB}; + return Status::OK(); +} + +static bool is_empty_object_marker(const FieldWithDataType& value) { + if (value.field.get_type() != TYPE_JSONB) { + return false; + } + const auto& jsonb = value.field.get<TYPE_JSONB>(); + const JsonbDocument* document = nullptr; + Status st = + JsonbDocument::checkAndCreateDocument(jsonb.get_value(), jsonb.get_size(), &document); + if (!st.ok() || document == nullptr || document->getValue() == nullptr || + !document->getValue()->isObject()) { + return false; + } + return document->getValue()->unpack<ObjectVal>()->numElem() == 0; +} + +static Status collect_empty_object_markers(const rapidjson::Value& value, PathInDataBuilder* path, + VariantMap* values) { + if (!value.IsObject()) { + return Status::OK(); + } + if (value.MemberCount() == 0) { + return insert_empty_object_marker(path->build(), values); + } + for (auto it = value.MemberBegin(); it != value.MemberEnd(); ++it) { + if (it->value.IsObject()) { + path->append(std::string_view(it->name.GetString(), it->name.GetStringLength()), false); + RETURN_IF_ERROR(collect_empty_object_markers(it->value, path, values)); + path->pop_back(); + } + } + return Status::OK(); +} + +static Status add_empty_object_markers_from_json(const std::string& json, const PathInData& prefix, + VariantMap* values) { + if (json.find("{}") == std::string::npos) { + return Status::OK(); + } + rapidjson::Document document; + document.Parse(json.data(), json.size()); + if (document.HasParseError()) { + return Status::Corruption("Invalid Parquet VARIANT decoded JSON"); + } + PathInDataBuilder path; + path.append(prefix.get_parts(), false); + return collect_empty_object_markers(document, &path, values); +} + +static Status parse_json_to_variant_map(const std::string& json, const PathInData& prefix, + VariantMap* values) { + auto parsed_column = ColumnVariant::create(0, false); + ParseConfig parse_config; + StringRef json_ref(json.data(), json.size()); + RETURN_IF_CATCH_EXCEPTION( + variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config)); + Field parsed = (*parsed_column)[0]; + if (!parsed.is_null()) { + auto& parsed_values = parsed.get<TYPE_VARIANT>(); + for (auto& [path, value] : parsed_values) { + (*values)[append_path(prefix, path)] = std::move(value); + } + } + RETURN_IF_ERROR(add_empty_object_markers_from_json(json, prefix, values)); + return Status::OK(); +} + +static Status variant_map_to_json(VariantMap values, std::string* json) { + auto variant_column = ColumnVariant::create(0, false); + RETURN_IF_CATCH_EXCEPTION( + variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values)))); + DataTypeSerDe::FormatOptions options; + variant_column->serialize_one_row_to_string(0, json, options); + return Status::OK(); +} + +static bool path_has_prefix(const PathInData& path, const PathInData& prefix) { + const auto& parts = path.get_parts(); + const auto& prefix_parts = prefix.get_parts(); + if (parts.size() < prefix_parts.size()) { + return false; + } + for (size_t i = 0; i < prefix_parts.size(); ++i) { + if (parts[i] != prefix_parts[i]) { + return false; + } + } + return true; +} + +static bool has_descendant_path(const VariantMap& values, const PathInData& prefix) { + const size_t prefix_size = prefix.get_parts().size(); + return std::ranges::any_of(values, [&](const auto& entry) { + const auto& path = entry.first; + return path.get_parts().size() > prefix_size && path_has_prefix(path, prefix); + }); +} + +static void erase_shadowed_empty_object_markers(VariantMap* values, + const VariantMap& shadowing_values) { + for (auto it = values->begin(); it != values->end();) { + if (is_empty_object_marker(it->second) && + (has_descendant_path(*values, it->first) || + has_descendant_path(shadowing_values, it->first))) { + it = values->erase(it); + continue; + } + ++it; + } +} + +static void erase_shadowed_empty_object_markers(VariantMap* value_values, + VariantMap* typed_values) { + erase_shadowed_empty_object_markers(value_values, *typed_values); + erase_shadowed_empty_object_markers(typed_values, *value_values); +} + +static Status check_no_shredded_value_typed_duplicates(const VariantMap& value_values, + const VariantMap& typed_values, + const PathInData& prefix) { + const size_t prefix_size = prefix.get_parts().size(); + for (const auto& value_entry : value_values) { + const auto& value_path = value_entry.first; + if (!path_has_prefix(value_path, prefix)) { + continue; + } + if (value_path.get_parts().size() == prefix_size) { + if (is_empty_object_marker(value_entry.second) && + !has_descendant_path(typed_values, value_path)) { + continue; + } + if (!typed_values.empty()) { + return Status::Corruption( + "Parquet VARIANT residual value conflicts with typed_value at path {}", + value_path.get_path()); + } + continue; + } + for (const auto& typed_entry : typed_values) { + const auto& typed_path = typed_entry.first; + if (!path_has_prefix(typed_path, prefix)) { + continue; + } + if (typed_path.get_parts().size() == prefix_size) { + if (is_empty_object_marker(typed_entry.second) && + !has_descendant_path(value_values, typed_path)) { + continue; + } + return Status::Corruption( + "Parquet VARIANT residual value and typed_value contain duplicate field {}", + value_path.get_parts()[prefix_size].key); + } + if (value_path.get_parts()[prefix_size] == typed_path.get_parts()[prefix_size]) { + if (value_path == typed_path && is_empty_object_marker(value_entry.second) && + is_empty_object_marker(typed_entry.second)) { + continue; + } + return Status::Corruption( + "Parquet VARIANT residual value and typed_value contain duplicate field {}", + value_path.get_parts()[prefix_size].key); + } + } + } + return Status::OK(); +} + +static bool has_direct_typed_parent_null(const std::vector<const NullMap*>& null_maps, size_t row) { + return std::ranges::any_of(null_maps, [&](const NullMap* null_map) { + DCHECK_LT(row, null_map->size()); + return (*null_map)[row]; + }); +} + +static void insert_direct_typed_leaf_range(const IColumn& column, size_t start, size_t rows, + const std::vector<const NullMap*>& parent_null_maps, + IColumn* variant_leaf) { + auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf); + const IColumn* value_column = &column; + const NullMap* leaf_null_map = nullptr; + if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) { + value_column = &nullable_column->get_nested_column(); + leaf_null_map = &nullable_column->get_null_map_data(); + } + + nullable_leaf.get_nested_column().insert_range_from(*value_column, start, rows); + auto& null_map = nullable_leaf.get_null_map_data(); + null_map.reserve(null_map.size() + rows); + for (size_t i = 0; i < rows; ++i) { + const size_t row = start + i; + const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row]; + null_map.push_back(leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row)); + } +} + +static bool is_temporal_variant_leaf_type(PrimitiveType type) { + switch (type) { + case TYPE_TIMEV2: + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_DATEV2: + case TYPE_DATETIMEV2: + case TYPE_TIMESTAMPTZ: + return true; + default: + return false; + } +} + +static DataTypePtr direct_variant_leaf_type(const DataTypePtr& data_type) { + const auto& type = remove_nullable(data_type); + if (is_temporal_variant_leaf_type(type->get_primitive_type())) { + return std::make_shared<DataTypeInt64>(); + } + return type; +} + +static bool contains_temporal_variant_leaf_type(const DataTypePtr& data_type) { + const auto& type = remove_nullable(data_type); + if (is_temporal_variant_leaf_type(type->get_primitive_type())) { + return true; + } + if (type->get_primitive_type() == TYPE_ARRAY) { + return contains_temporal_variant_leaf_type( + assert_cast<const DataTypeArray*>(type.get())->get_nested_type()); + } + return false; +} + +static int64_t direct_temporal_variant_value(PrimitiveType type, const IColumn& column, + size_t row) { + switch (type) { + case TYPE_TIMEV2: + return static_cast<int64_t>( + std::llround(assert_cast<const ColumnTimeV2&>(column).get_data()[row])); + case TYPE_DATE: + return variant_date_value(assert_cast<const ColumnDate&>(column).get_data()[row]); + case TYPE_DATETIME: + return variant_datetime_value(assert_cast<const ColumnDateTime&>(column).get_data()[row]); + case TYPE_DATEV2: + return variant_date_value(assert_cast<const ColumnDateV2&>(column).get_data()[row]); + case TYPE_DATETIMEV2: + return variant_datetime_value(assert_cast<const ColumnDateTimeV2&>(column).get_data()[row]); + case TYPE_TIMESTAMPTZ: + return variant_datetime_value( + assert_cast<const ColumnTimeStampTz&>(column).get_data()[row]); + default: + DORIS_CHECK(false); + return 0; + } +} + +static void insert_direct_typed_temporal_leaf_range( + PrimitiveType type, const IColumn& column, size_t start, size_t rows, + const std::vector<const NullMap*>& parent_null_maps, IColumn* variant_leaf) { + auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf); + const IColumn* value_column = &column; + const NullMap* leaf_null_map = nullptr; + if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) { + value_column = &nullable_column->get_nested_column(); + leaf_null_map = &nullable_column->get_null_map_data(); + } + + auto& data = assert_cast<ColumnInt64&>(nullable_leaf.get_nested_column()).get_data(); + data.reserve(data.size() + rows); + auto& null_map = nullable_leaf.get_null_map_data(); + null_map.reserve(null_map.size() + rows); + for (size_t i = 0; i < rows; ++i) { + const size_t row = start + i; + data.push_back(direct_temporal_variant_value(type, *value_column, row)); + const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row]; + null_map.push_back(leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row)); + } +} + +static void append_json_string(std::string_view value, std::string* json) { + auto column = ColumnString::create(); + VectorBufferWriter writer(*column); + writer.write_json_string(value); + writer.commit(); + json->append(column->get_data_at(0).data, column->get_data_at(0).size); +} + +static bool is_column_selected(const FieldSchema& field_schema, + const std::set<uint64_t>& column_ids) { + return column_ids.empty() || column_ids.find(field_schema.get_column_id()) != column_ids.end(); +} + +static bool has_selected_column(const FieldSchema& field_schema, + const std::set<uint64_t>& column_ids) { + if (is_column_selected(field_schema, column_ids)) { + return true; + } + return std::any_of(field_schema.children.begin(), field_schema.children.end(), + [&column_ids](const FieldSchema& child) { + return has_selected_column(child, column_ids); + }); +} + +static bool is_direct_variant_leaf_type(const DataTypePtr& data_type) { + const auto& type = remove_nullable(data_type); + switch (type->get_primitive_type()) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + case TYPE_DECIMAL256: + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: + return true; + case TYPE_TIMEV2: + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_DATEV2: + case TYPE_DATETIMEV2: + case TYPE_TIMESTAMPTZ: + return true; + case TYPE_ARRAY: { + const auto* array_type = assert_cast<const DataTypeArray*>(type.get()); + return !contains_temporal_variant_leaf_type(array_type->get_nested_type()) && + is_direct_variant_leaf_type(array_type->get_nested_type()); + } + default: + return false; + } +} + +static bool can_direct_read_typed_value(const FieldSchema& field_schema, bool allow_variant_wrapper, + const std::set<uint64_t>& column_ids) { + if (!has_selected_column(field_schema, column_ids)) { + return true; + } + if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) { + const int value_idx = find_child_idx(field_schema, "value"); + const int typed_value_idx = find_child_idx(field_schema, "typed_value"); + return (value_idx < 0 || + !has_selected_column(field_schema.children[value_idx], column_ids)) && + typed_value_idx >= 0 && + can_direct_read_typed_value(field_schema.children[typed_value_idx], false, + column_ids); + } + + const auto& type = remove_nullable(field_schema.data_type); + if (type->get_primitive_type() == TYPE_STRUCT) { + return std::all_of(field_schema.children.begin(), field_schema.children.end(), + [&column_ids](const FieldSchema& child) { + return can_direct_read_typed_value(child, true, column_ids); + }); + } + return is_direct_variant_leaf_type(field_schema.data_type); +} + +static bool has_selected_direct_typed_leaf(const FieldSchema& field_schema, + bool allow_variant_wrapper, + const std::set<uint64_t>& column_ids) { + if (!has_selected_column(field_schema, column_ids)) { + return false; + } + if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) { + const int typed_value_idx = find_child_idx(field_schema, "typed_value"); + DCHECK_GE(typed_value_idx, 0); + return has_selected_direct_typed_leaf(field_schema.children[typed_value_idx], false, + column_ids); + } + + const auto& type = remove_nullable(field_schema.data_type); + if (type->get_primitive_type() == TYPE_STRUCT) { + return std::any_of(field_schema.children.begin(), field_schema.children.end(), + [&column_ids](const FieldSchema& child) { + return has_selected_direct_typed_leaf(child, true, column_ids); + }); + } + return is_direct_variant_leaf_type(field_schema.data_type); +} + +static bool can_use_direct_typed_only_value(const FieldSchema& variant_field, + const std::set<uint64_t>& column_ids) { + const int value_idx = find_child_idx(variant_field, "value"); + const int typed_value_idx = find_child_idx(variant_field, "typed_value"); + return (value_idx < 0 || !has_selected_column(variant_field.children[value_idx], column_ids)) && + typed_value_idx >= 0 && + has_selected_direct_typed_leaf(variant_field.children[typed_value_idx], false, + column_ids) && + can_direct_read_typed_value(variant_field.children[typed_value_idx], false, column_ids); +} + +static void fill_variant_field_info(FieldWithDataType* value) { + FieldInfo info; + variant_util::get_field_info(value->field, &info); + DCHECK_LE(info.num_dimensions, std::numeric_limits<uint8_t>::max()); + value->base_scalar_type_id = info.scalar_type_id; + value->num_dimensions = static_cast<uint8_t>(info.num_dimensions); +} + +static void fill_variant_leaf_type_info(const DataTypePtr& data_type, FieldWithDataType* value) { + auto leaf_type = remove_nullable(data_type); + while (leaf_type->get_primitive_type() == TYPE_ARRAY) { + leaf_type = remove_nullable( + assert_cast<const DataTypeArray*>(leaf_type.get())->get_nested_type()); + } + if (is_decimal(leaf_type->get_primitive_type())) { + value->precision = leaf_type->get_precision(); + value->scale = leaf_type->get_scale(); + } +} + +template <PrimitiveType Primitive> +static Status fill_floating_point_variant_field(const Field& field, FieldWithDataType* value) { + const auto typed_value = field.get<Primitive>(); + if (!std::isfinite(typed_value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point typed_value is not supported"); + } + value->field = field; + fill_variant_field_info(value); + return Status::OK(); +} + +static Status field_to_variant_field(const FieldSchema& field_schema, const Field& field, + FieldWithDataType* value, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + *present = true; + const DataTypePtr& type = remove_nullable(field_schema.data_type); + switch (type->get_primitive_type()) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + case TYPE_DECIMAL256: + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_ARRAY: + value->field = field; + fill_variant_field_info(value); + fill_variant_leaf_type_info(type, value); + return Status::OK(); + case TYPE_FLOAT: + return fill_floating_point_variant_field<TYPE_FLOAT>(field, value); + case TYPE_DOUBLE: + return fill_floating_point_variant_field<TYPE_DOUBLE>(field, value); + case TYPE_TIMEV2: + value->field = Field::create_field<TYPE_BIGINT>( + static_cast<int64_t>(std::llround(field.get<TYPE_TIMEV2>()))); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_DATE: + value->field = Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATE>())); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_DATETIME: + value->field = Field::create_field<TYPE_BIGINT>( + variant_datetime_value(field.get<TYPE_DATETIME>())); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_DATEV2: + value->field = + Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATEV2>())); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_DATETIMEV2: + value->field = Field::create_field<TYPE_BIGINT>( + variant_datetime_value(field.get<TYPE_DATETIMEV2>())); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_TIMESTAMPTZ: + value->field = Field::create_field<TYPE_BIGINT>( + variant_datetime_value(field.get<TYPE_TIMESTAMPTZ>())); + value->base_scalar_type_id = TYPE_BIGINT; + return Status::OK(); + case TYPE_VARBINARY: + return Status::NotSupported("Parquet VARIANT binary typed_value is not supported"); + default: + return Status::Corruption("Unsupported Parquet VARIANT typed_value Doris type {}", + type->get_name()); + } +} + +static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, std::string* json, bool* present); + +static Status serialize_field_to_json(const DataTypePtr& data_type, const Field& field, + std::string* json) { + MutableColumnPtr column = data_type->create_column(); + column->insert(field); + + auto json_column = ColumnString::create(); + VectorBufferWriter writer(*json_column); + auto serde = data_type->get_serde(); + DataTypeSerDe::FormatOptions options; + RETURN_IF_ERROR(serde->serialize_one_cell_to_json(*column, 0, writer, options)); + writer.commit(); + *json = json_column->get_data_at(0).to_string(); + return Status::OK(); +} + +static Status scalar_typed_value_to_json(const FieldSchema& field_schema, const Field& field, + std::string* json, bool* present) { + FieldWithDataType value; + RETURN_IF_ERROR(field_to_variant_field(field_schema, field, &value, present)); + if (!*present) { + return Status::OK(); + } + if (value.field.is_null()) { + *json = "null"; + return Status::OK(); + } + + DataTypePtr json_type; + if (value.base_scalar_type_id != PrimitiveType::INVALID_TYPE) { + json_type = DataTypeFactory::instance().create_data_type(value.base_scalar_type_id, false, + value.precision, value.scale); + } else { + json_type = remove_nullable(field_schema.data_type); + } + return serialize_field_to_json(json_type, value.field, json); +} + +static Status resolve_variant_metadata(const FieldSchema& variant_field, const Struct& fields, + const std::string* inherited_metadata, std::string* metadata, + bool* has_metadata) { + *has_metadata = false; + if (inherited_metadata != nullptr) { + *metadata = *inherited_metadata; + *has_metadata = true; + } + + const int metadata_idx = find_child_idx(variant_field, "metadata"); + if (metadata_idx >= 0) { + bool metadata_present = false; + RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], metadata, &metadata_present)); + *has_metadata = metadata_present; + } + return Status::OK(); +} + +static Status variant_typed_value_to_json(const FieldSchema& variant_field, const Struct& fields, + const std::string& metadata, std::string* typed_json, + bool* typed_present) { + *typed_present = false; + const int typed_value_idx = find_child_idx(variant_field, "typed_value"); + if (typed_value_idx < 0) { + return Status::OK(); + } + return typed_value_to_json(variant_field.children[typed_value_idx], fields[typed_value_idx], + metadata, typed_json, typed_present); +} + +static Status variant_residual_value_to_json(const FieldSchema& variant_field, const Struct& fields, + const std::string& metadata, bool has_metadata, + std::string* value_json, bool* value_present) { + *value_present = false; + const int value_idx = find_child_idx(variant_field, "value"); + if (value_idx < 0) { + return Status::OK(); + } + + std::string value; + RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, value_present)); + if (!*value_present) { + return Status::OK(); + } + if (!has_metadata) { + return Status::Corruption("Parquet VARIANT value is present without metadata"); + } + return parquet::decode_variant_to_json(StringRef(metadata.data(), metadata.size()), + StringRef(value.data(), value.size()), value_json); +} + +static Status merge_variant_value_and_typed_json(const std::string& value_json, + const std::string& typed_json, std::string* json) { + VariantMap value_values; + RETURN_IF_ERROR(parse_json_to_variant_map(value_json, PathInData(), &value_values)); + VariantMap typed_values; + RETURN_IF_ERROR(parse_json_to_variant_map(typed_json, PathInData(), &typed_values)); + erase_shadowed_empty_object_markers(&value_values, &typed_values); + auto root_value = value_values.find(PathInData()); + if (root_value != value_values.end() && !is_empty_object_marker(root_value->second)) { + return Status::Corruption( + "Parquet VARIANT has conflicting non-object value and typed_value"); + } + RETURN_IF_ERROR( + check_no_shredded_value_typed_duplicates(value_values, typed_values, PathInData())); + value_values.merge(std::move(typed_values)); + return variant_map_to_json(std::move(value_values), json); +} + +static Status variant_to_json(const FieldSchema& variant_field, const Field& field, + const std::string* inherited_metadata, std::string* json, + bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + + const auto& fields = field.get<TYPE_STRUCT>(); + std::string metadata; + bool has_metadata = false; + RETURN_IF_ERROR(resolve_variant_metadata(variant_field, fields, inherited_metadata, &metadata, + &has_metadata)); + + std::string typed_json; + bool typed_present = false; + RETURN_IF_ERROR(variant_typed_value_to_json(variant_field, fields, metadata, &typed_json, + &typed_present)); + + std::string value_json; + bool value_present = false; + RETURN_IF_ERROR(variant_residual_value_to_json(variant_field, fields, metadata, has_metadata, + &value_json, &value_present)); + + if (value_present && typed_present) { + RETURN_IF_ERROR(merge_variant_value_and_typed_json(value_json, typed_json, json)); + *present = true; + return Status::OK(); + } + + if (typed_present) { + *json = std::move(typed_json); + *present = true; + return Status::OK(); + } + if (value_present) { + *json = std::move(value_json); + *present = true; + return Status::OK(); + } + + *present = false; + return Status::OK(); +} + +static Status shredded_field_to_json(const FieldSchema& field_schema, const Field& field, + const std::string& metadata, std::string* json, bool* present, + bool allow_scalar_typed_value_only_wrapper) { + if (is_variant_wrapper_field(field_schema, allow_scalar_typed_value_only_wrapper, true)) { + return variant_to_json(field_schema, field, &metadata, json, present); + } + return typed_value_to_json(field_schema, field, metadata, json, present); +} + +static Status typed_array_to_json(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, std::string* json, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + if (typed_value_field.children.empty()) { + return Status::Corruption("Parquet VARIANT array typed_value has no element schema"); + } + + const auto& elements = field.get<TYPE_ARRAY>(); + const auto& element_schema = typed_value_field.children[0]; + json->clear(); + json->push_back('['); + for (size_t i = 0; i < elements.size(); ++i) { + if (i != 0) { + json->push_back(','); + } + std::string element_json; + bool element_present = false; + RETURN_IF_ERROR(shredded_field_to_json(element_schema, elements[i], metadata, &element_json, + &element_present, true)); + if (element_present) { + json->append(element_json); + } else { + return Status::Corruption("Parquet VARIANT array typed_value element is missing"); + } + } + json->push_back(']'); + *present = true; + return Status::OK(); +} + +static Status typed_struct_to_json(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, std::string* json, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + + const auto& fields = field.get<TYPE_STRUCT>(); + json->clear(); + json->push_back('{'); + bool first = true; + for (int i = 0; i < typed_value_field.children.size(); ++i) { + std::string child_json; + bool child_present = false; + RETURN_IF_ERROR(shredded_field_to_json(typed_value_field.children[i], fields[i], metadata, + &child_json, &child_present, false)); + if (!child_present) { + continue; + } + if (!first) { + json->push_back(','); + } + append_json_string(typed_value_field.children[i].name, json); + json->push_back(':'); + json->append(child_json); + first = false; + } + json->push_back('}'); + *present = true; + return Status::OK(); +} + +static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, std::string* json, bool* present) { + const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type); + switch (typed_type->get_primitive_type()) { + case TYPE_STRUCT: + return typed_struct_to_json(typed_value_field, field, metadata, json, present); + case TYPE_ARRAY: + return typed_array_to_json(typed_value_field, field, metadata, json, present); + default: + return scalar_typed_value_to_json(typed_value_field, field, json, present); + } +} + +static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, PathInDataBuilder* path, + VariantMap* values, bool* present); + +static Status variant_to_variant_map(const FieldSchema& variant_field, const Field& field, + const std::string* inherited_metadata, PathInDataBuilder* path, + VariantMap* values, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + const auto& fields = field.get<TYPE_STRUCT>(); + const int metadata_idx = find_child_idx(variant_field, "metadata"); + const int value_idx = find_child_idx(variant_field, "value"); + const int typed_value_idx = find_child_idx(variant_field, "typed_value"); + + std::string metadata; + bool has_metadata = false; + if (inherited_metadata != nullptr) { + metadata = *inherited_metadata; + has_metadata = true; + } + if (metadata_idx >= 0) { + bool metadata_present = false; + RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], &metadata, &metadata_present)); + has_metadata = metadata_present; + } + + VariantMap value_values; + bool value_present = false; + const PathInData current_path = path->build(); + if (value_idx >= 0) { + std::string value; + RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, &value_present)); + if (value_present) { + if (!has_metadata) { + return Status::Corruption("Parquet VARIANT value is present without metadata"); + } + std::string value_json; + RETURN_IF_ERROR(parquet::decode_variant_to_json( + StringRef(metadata.data(), metadata.size()), + StringRef(value.data(), value.size()), &value_json)); + RETURN_IF_ERROR(parse_json_to_variant_map(value_json, current_path, &value_values)); + } + } + + VariantMap typed_values; + bool typed_present = false; + if (typed_value_idx >= 0) { + RETURN_IF_ERROR(typed_value_to_variant_map(variant_field.children[typed_value_idx], + fields[typed_value_idx], metadata, path, + &typed_values, &typed_present)); + } + + erase_shadowed_empty_object_markers(&value_values, &typed_values); + auto current_value = value_values.find(current_path); + if (value_present && typed_present && current_value != value_values.end() && + !is_empty_object_marker(current_value->second)) { + return Status::Corruption( + "Parquet VARIANT has conflicting non-object value and typed_value"); + } + RETURN_IF_ERROR( + check_no_shredded_value_typed_duplicates(value_values, typed_values, current_path)); + values->merge(std::move(value_values)); + values->merge(std::move(typed_values)); + *present = value_present || typed_present; + return Status::OK(); +} + +static Status shredded_field_to_variant_map(const FieldSchema& field_schema, const Field& field, + const std::string& metadata, PathInDataBuilder* path, + VariantMap* values, bool* present) { + if (is_variant_wrapper_field(field_schema, false, true)) { + return variant_to_variant_map(field_schema, field, &metadata, path, values, present); + } + return typed_value_to_variant_map(field_schema, field, metadata, path, values, present); +} + +static Status append_typed_field_to_variant_map(const FieldSchema& typed_value_field, + const Field& field, PathInDataBuilder* path, + VariantMap* values, bool* present) { + FieldWithDataType value; + RETURN_IF_ERROR(field_to_variant_field(typed_value_field, field, &value, present)); + if (*present) { + (*values)[path->build()] = std::move(value); + } + return Status::OK(); +} + +static Status typed_array_to_variant_map(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, PathInDataBuilder* path, + VariantMap* values, bool* present) { + if (is_direct_variant_leaf_type(typed_value_field.data_type)) { + return append_typed_field_to_variant_map(typed_value_field, field, path, values, present); + } + + std::string value_json; + RETURN_IF_ERROR(typed_value_to_json(typed_value_field, field, metadata, &value_json, present)); + if (*present) { + RETURN_IF_ERROR(parse_json_to_variant_map(value_json, path->build(), values)); + } + return Status::OK(); +} + +static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field, + const std::string& metadata, PathInDataBuilder* path, + VariantMap* values, bool* present) { + if (field.is_null()) { + *present = false; + return Status::OK(); + } + const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type); + if (typed_type->get_primitive_type() == TYPE_STRUCT) { + const auto& fields = field.get<TYPE_STRUCT>(); + *present = true; + bool has_present_child = false; + for (int i = 0; i < typed_value_field.children.size(); ++i) { + path->append(typed_value_field.children[i].name, false); + bool child_present = false; + RETURN_IF_ERROR(shredded_field_to_variant_map(typed_value_field.children[i], fields[i], + metadata, path, values, &child_present)); + has_present_child |= child_present; + path->pop_back(); + } + if (!has_present_child) { + RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values)); + } + return Status::OK(); + } + if (typed_type->get_primitive_type() == TYPE_ARRAY) { + return typed_array_to_variant_map(typed_value_field, field, metadata, path, values, + present); + } + + return append_typed_field_to_variant_map(typed_value_field, field, path, values, present); +} + +static Status append_direct_typed_column_to_batch(const FieldSchema& field_schema, + const IColumn& column, size_t start, size_t rows, + PathInDataBuilder* path, ColumnVariant* batch, + bool allow_variant_wrapper, + const std::set<uint64_t>& column_ids, + std::vector<const NullMap*> parent_null_maps) { + if (!has_selected_column(field_schema, column_ids)) { + return Status::OK(); + } + + const IColumn* value_column = &column; + if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) { + parent_null_maps.push_back(&nullable_column->get_null_map_data()); + value_column = &nullable_column->get_nested_column(); + } + + if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) { + const int typed_value_idx = find_child_idx(field_schema, "typed_value"); + DCHECK_GE(typed_value_idx, 0); + const auto& typed_struct = assert_cast<const ColumnStruct&>(*value_column); + return append_direct_typed_column_to_batch( + field_schema.children[typed_value_idx], typed_struct.get_column(typed_value_idx), + start, rows, path, batch, false, column_ids, parent_null_maps); + } + Review Comment: The direct typed path does not preserve present empty typed objects. The row-wise path below marks a present struct with no present children via `insert_empty_object_marker()`, but this direct path only copies selected leaf subcolumns. For a typed-only layout such as `typed_value.obj { optional int32 x; }`, a row where `obj` is present and `x` is absent/null can take the direct path and only add nullable `obj.x`; the empty object marker is never emitted, so the object can disappear or become `obj.x = null` instead of `obj = {}`. Please either make the direct path ineligible for nullable typed structs that need empty-object markers, or emit equivalent markers, and add coverage for a present empty typed struct. ########## be/src/format/table/iceberg/types.cpp: ########## @@ -170,6 +170,8 @@ std::unique_ptr<PrimitiveType> Types::from_primitive_string(const std::string& t return std::make_unique<UUIDType>(); } else if (lower_type_string == "binary") { return std::make_unique<BinaryType>(); + } else if (lower_type_string == "variant") { Review Comment: This exposes Iceberg `variant` to the BE Iceberg schema model, but the Iceberg Parquet write path still cannot convert it. `VParquetTransformer::_parse_schema()` calls `ArrowSchemaUtil::convert()`, and `be/src/format/table/iceberg/arrow_schema_util.cpp` has no `TypeID::VARIANT` case, so an Iceberg table with a VARIANT column can be mapped for reads but `INSERT INTO`/write planning will fail at runtime with `Unsupported field type:variant`. If writes are intentionally unsupported, FE should reject Iceberg VARIANT writes before sending them to BE; otherwise the Arrow/Iceberg schema conversion needs matching VARIANT support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
