github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3253077731
########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,949 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <deque> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +#include "core/column/column_variant.h" +#include "core/value/jsonb_value.h" +#include "exec/common/variant_util.h" + +namespace doris::parquet { + +std::string format_variant_uuid(const uint8_t* ptr) { + static constexpr char hex[] = "0123456789abcdef"; + std::string uuid; + uuid.reserve(36); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid.push_back('-'); + } + uuid.push_back(hex[ptr[i] >> 4]); + uuid.push_back(hex[ptr[i] & 0x0f]); + } + return uuid; +} + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +struct VariantObjectLayout { + std::vector<uint64_t> field_ids; + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +struct VariantArrayLayout { + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + if (size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +bool is_valid_utf8(std::string_view value) { + const auto* data = reinterpret_cast<const uint8_t*>(value.data()); + const auto* end = data + value.size(); + while (data < end) { + const uint8_t first = *data++; + if (first <= 0x7f) { + continue; + } + + uint32_t code_point = 0; + size_t continuation_bytes = 0; + if (first >= 0xc2 && first <= 0xdf) { + code_point = first & 0x1f; + continuation_bytes = 1; + } else if (first >= 0xe0 && first <= 0xef) { + code_point = first & 0x0f; + continuation_bytes = 2; + } else if (first >= 0xf0 && first <= 0xf4) { + code_point = first & 0x07; + continuation_bytes = 3; + } else { + return false; + } + + if (static_cast<size_t>(end - data) < continuation_bytes) { + return false; + } + for (size_t i = 0; i < continuation_bytes; ++i) { + const uint8_t byte = *data++; + if ((byte & 0xc0) != 0x80) { + return false; + } + code_point = (code_point << 6) | (byte & 0x3f); + } + + if ((continuation_bytes == 2 && code_point < 0x800) || + (continuation_bytes == 3 && code_point < 0x10000) || + (code_point >= 0xd800 && code_point <= 0xdfff) || code_point > 0x10ffff) { + return false; + } + } + return true; +} + +Status require_valid_utf8(std::string_view value, std::string_view context) { + if (!is_valid_utf8(value)) { + return Status::Corruption("Invalid Parquet VARIANT {} UTF-8 string", context); + } + return Status::OK(); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +Status append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point value is not supported"); + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); + return Status::OK(); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const uint8_t* end, + std::string* json, const uint8_t** next); +Status decode_value(const uint8_t* ptr, const uint8_t* end, const VariantMetadata& metadata, + std::string* json, const uint8_t** next); + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + json->push_back('"'); + json->append(format_variant_uuid(ptr)); + json->push_back('"'); +} + +Status make_jsonb_field(std::string_view json, FieldWithDataType* value) { + JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + value->field = + Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), jsonb_value.size())); + value->base_scalar_type_id = TYPE_JSONB; + return Status::OK(); +} + +Status insert_empty_object_marker(const PathInData& path, VariantMap* values) { + FieldWithDataType value; + RETURN_IF_ERROR(make_jsonb_field("{}", &value)); + (*values)[path] = std::move(value); + return Status::OK(); +} + +Status parse_json_to_variant_map(std::string_view json, const PathInData& prefix, + VariantMap* values) { + auto parsed_column = ColumnVariant::create(0, false); + ParseConfig parse_config; + StringRef json_ref(json.data(), json.size()); + RETURN_IF_CATCH_EXCEPTION( + variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config)); + Field parsed = (*parsed_column)[0]; + if (parsed.is_null()) { Review Comment: Dropping a parsed top-level JSON `null` here loses explicit Parquet VARIANT nulls on the `VariantMap` path. For example, a residual object value like `{"a": null}` enters `decode_object_to_variant_map()` with path `a`; `decode_primitive_to_variant_map()` decodes primitive header 0 to JSON `null`, `parse_json_to_variant_map()` returns without inserting `values[PathInData("a")] = Field()`, and the reconstructed full VARIANT omits `a` instead of preserving `a: null`. A top-level residual `value` encoded as primitive null has the same problem: `value_present` is true but the decoded map is empty, so it is not treated like the required-missing-payload case that explicitly inserts a root Variant null. Please insert a `Field()` at `prefix` when the parsed JSON is null, and add coverage for a residual primitive null both at the root and as an object child. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
