eldenmoon commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3234686448
########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,538 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +namespace doris::parquet { + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end || size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +void append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + json->append("null"); + return; + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + static constexpr char hex[] = "0123456789abcdef"; + json->push_back('"'); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + json->push_back('-'); + } + json->push_back(hex[ptr[i] >> 4]); + json->push_back(hex[ptr[i] & 0x0f]); + } + json->push_back('"'); +} + +Status decode_metadata(const StringRef& metadata, VariantMetadata* result) { + const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data); + const auto* end = ptr + metadata.size; + RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata")); + uint8_t header = *ptr++; + uint8_t version = header & 0x0f; + if (version != 1) { + return Status::Corruption("Unsupported Parquet VARIANT metadata version {}", version); + } + int offset_size = ((header >> 6) & 0x03) + 1; + RETURN_IF_ERROR(require_available(ptr, end, offset_size, "metadata dictionary size")); + uint64_t dictionary_size = read_unsigned_le(ptr, offset_size); + ptr += offset_size; + + RETURN_IF_ERROR(require_available_entries(ptr, end, dictionary_size + 1, offset_size, + "metadata dictionary offsets")); + std::vector<uint64_t> offsets(dictionary_size + 1); + for (uint64_t i = 0; i <= dictionary_size; ++i) { + offsets[i] = read_unsigned_le(ptr, offset_size); + ptr += offset_size; + if (i > 0 && offsets[i] < offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT metadata dictionary offsets"); + } + } + + RETURN_IF_ERROR(require_available(ptr, end, offsets.back(), "metadata dictionary bytes")); + result->dictionary.clear(); + result->dictionary.reserve(dictionary_size); + for (uint64_t i = 0; i < dictionary_size; ++i) { + result->dictionary.emplace_back(reinterpret_cast<const char*>(ptr + offsets[i]), + offsets[i + 1] - offsets[i]); + } + return Status::OK(); +} + +Status decode_value(const uint8_t* ptr, const uint8_t* end, const VariantMetadata& metadata, + std::string* json, const uint8_t** next); + +Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const uint8_t* end, + std::string* json, const uint8_t** next) { + switch (primitive_header) { + case 0: + json->append("null"); + *next = ptr; + return Status::OK(); + case 1: + json->append("true"); + *next = ptr; + return Status::OK(); + case 2: + json->append("false"); + *next = ptr; + return Status::OK(); + case 3: + RETURN_IF_ERROR(require_available(ptr, end, 1, "int8 value")); + json->append(std::to_string(static_cast<int8_t>(*ptr))); + *next = ptr + 1; + return Status::OK(); + case 4: + RETURN_IF_ERROR(require_available(ptr, end, 2, "int16 value")); + json->append(std::to_string(read_signed_le(ptr, 2))); + *next = ptr + 2; + return Status::OK(); + case 5: + RETURN_IF_ERROR(require_available(ptr, end, 4, "int32 value")); + json->append(std::to_string(read_signed_le(ptr, 4))); + *next = ptr + 4; + return Status::OK(); + case 6: + RETURN_IF_ERROR(require_available(ptr, end, 8, "int64 value")); + json->append(std::to_string(read_signed_le(ptr, 8))); + *next = ptr + 8; + return Status::OK(); + case 7: { + RETURN_IF_ERROR(require_available(ptr, end, 8, "double value")); + uint64_t bits = read_unsigned_le(ptr, 8); + double value; + std::memcpy(&value, &bits, sizeof(value)); + append_floating_json(value, json); + *next = ptr + 8; + return Status::OK(); + } + case 8: + case 9: + case 10: { + int value_size = primitive_header == 8 ? 4 : primitive_header == 9 ? 8 : 16; + RETURN_IF_ERROR(require_available(ptr, end, 1 + value_size, "decimal value")); + int scale = static_cast<int8_t>(*ptr++); + __int128 unscaled = 0; + if (value_size == 16) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + if ((ptr[15] & 0x80) != 0) { + unscaled = -static_cast<__int128>((~unsigned_value) + 1); + } else { + unscaled = static_cast<__int128>(unsigned_value); + } + } else { + unscaled = read_signed_le(ptr, value_size); + } + append_decimal_json(unscaled, scale, json); + *next = ptr + value_size; + return Status::OK(); + } + case 11: + RETURN_IF_ERROR(require_available(ptr, end, 4, "date value")); + json->append(std::to_string(read_signed_le(ptr, 4))); + *next = ptr + 4; + return Status::OK(); + case 12: + case 13: + case 17: + case 18: + case 19: + RETURN_IF_ERROR(require_available(ptr, end, 8, "time or timestamp value")); + json->append(std::to_string(read_signed_le(ptr, 8))); + *next = ptr + 8; + return Status::OK(); + case 14: { + RETURN_IF_ERROR(require_available(ptr, end, 4, "float value")); + uint32_t bits = static_cast<uint32_t>(read_unsigned_le(ptr, 4)); + float value; + std::memcpy(&value, &bits, sizeof(value)); + append_floating_json(value, json); + *next = ptr + 4; + return Status::OK(); + } + case 15: { + RETURN_IF_ERROR(require_available(ptr, end, 4, "binary length")); + uint64_t size = read_unsigned_le(ptr, 4); + ptr += 4; + RETURN_IF_ERROR(require_available(ptr, end, size, "binary value")); + return Status::NotSupported("Parquet VARIANT binary primitive is not supported"); + } + case 16: { + RETURN_IF_ERROR(require_available(ptr, end, 4, "binary or string length")); + uint64_t size = read_unsigned_le(ptr, 4); + ptr += 4; + RETURN_IF_ERROR(require_available(ptr, end, size, "string value")); + append_json_string( + std::string_view(reinterpret_cast<const char*>(ptr), static_cast<size_t>(size)), + json); + *next = ptr + size; + return Status::OK(); + } + case 20: + RETURN_IF_ERROR(require_available(ptr, end, 16, "uuid value")); + append_uuid_json(ptr, json); + *next = ptr + 16; + return Status::OK(); + default: + return Status::Corruption("Unsupported Parquet VARIANT primitive header {}", + primitive_header); + } +} + +Status decode_object(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, std::string* json, const uint8_t** next) { + int field_offset_size = (value_header & 0x03) + 1; + int field_id_size = ((value_header >> 2) & 0x03) + 1; + int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR( + require_available_entries(ptr, end, num_elements, field_id_size, "object field ids")); + std::vector<uint64_t> field_ids(num_elements); + for (uint64_t i = 0; i < num_elements; ++i) { Review Comment: Fixed in the latest push by rejecting non-strictly-increasing Parquet VARIANT object field ids during decode_object(). Added BE unit coverage for duplicate and out-of-order field ids via ParquetVariantReaderTest.* and verified locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
