github-actions[bot] commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3231490657


##########
be/src/format/parquet/parquet_variant_reader.cpp:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/parquet/parquet_variant_reader.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstring>
+#include <iomanip>
+#include <limits>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+namespace doris::parquet {
+
+namespace {
+
+struct VariantMetadata {
+    std::vector<std::string> dictionary;
+};
+
+uint64_t read_unsigned_le(const uint8_t* ptr, int size) {
+    uint64_t value = 0;
+    for (int i = 0; i < size; ++i) {
+        value |= static_cast<uint64_t>(ptr[i]) << (i * 8);
+    }
+    return value;
+}
+
+int64_t read_signed_le(const uint8_t* ptr, int size) {
+    uint64_t value = read_unsigned_le(ptr, size);
+    if (size < 8) {
+        uint64_t sign_bit = uint64_t {1} << (size * 8 - 1);
+        if ((value & sign_bit) != 0) {
+            uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1);
+            value |= mask;
+        }
+    }
+    return static_cast<int64_t>(value);
+}
+
+Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size,
+                         std::string_view context) {
+    if (ptr > end || size > static_cast<size_t>(end - ptr)) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return Status::OK();
+}
+
+Status require_available_entries(const uint8_t* ptr, const uint8_t* end, 
uint64_t entries,
+                                 size_t entry_size, std::string_view context) {
+    if (entries > std::numeric_limits<size_t>::max() / entry_size) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return require_available(ptr, end, static_cast<size_t>(entries) * 
entry_size, context);
+}
+
+Status validate_field_offsets(const std::vector<uint64_t>& field_offsets, 
uint64_t total_size,
+                              std::string_view context) {
+    if (field_offsets.empty() || field_offsets.front() != 0) {
+        return Status::Corruption("Invalid Parquet VARIANT {} field offsets", 
context);
+    }
+    for (size_t i = 0; i < field_offsets.size(); ++i) {
+        if (field_offsets[i] > total_size) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field offset 
{}", context,
+                                      field_offsets[i]);
+        }
+        if (i > 0 && field_offsets[i] < field_offsets[i - 1]) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field 
offsets", context);
+        }
+    }
+    return Status::OK();
+}
+
+void append_json_string(std::string_view value, std::string* json) {
+    json->push_back('"');
+    static constexpr char hex[] = "0123456789abcdef";
+    for (unsigned char c : value) {
+        switch (c) {
+        case '"':
+            json->append("\\\"");
+            break;
+        case '\\':
+            json->append("\\\\");
+            break;
+        case '\b':
+            json->append("\\b");
+            break;
+        case '\f':
+            json->append("\\f");
+            break;
+        case '\n':
+            json->append("\\n");
+            break;
+        case '\r':
+            json->append("\\r");
+            break;
+        case '\t':
+            json->append("\\t");
+            break;
+        default:
+            if (c < 0x20) {
+                json->append("\\u00");
+                json->push_back(hex[c >> 4]);
+                json->push_back(hex[c & 0x0f]);
+            } else {
+                json->push_back(static_cast<char>(c));
+            }
+            break;
+        }
+    }
+    json->push_back('"');
+}
+
+template <typename T>
+void append_floating_json(T value, std::string* json) {
+    if (!std::isfinite(value)) {
+        json->append("null");
+        return;
+    }
+    std::ostringstream oss;
+    oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value;
+    json->append(oss.str());
+}
+
+std::string int128_to_string(__int128 value) {
+    if (value == 0) {
+        return "0";
+    }
+    bool negative = value < 0;
+    unsigned __int128 unsigned_value = negative ? static_cast<unsigned 
__int128>(-(value + 1)) + 1
+                                                : static_cast<unsigned 
__int128>(value);
+    std::string digits;
+    while (unsigned_value > 0) {
+        digits.push_back(static_cast<char>('0' + unsigned_value % 10));
+        unsigned_value /= 10;
+    }
+    if (negative) {
+        digits.push_back('-');
+    }
+    std::reverse(digits.begin(), digits.end());
+    return digits;
+}
+
+void append_decimal_json(__int128 unscaled, int scale, std::string* json) {
+    std::string value = int128_to_string(unscaled);
+    bool negative = !value.empty() && value[0] == '-';
+    std::string digits = negative ? value.substr(1) : value;
+    if (scale == 0) {
+        json->append(value);
+        return;
+    }
+    if (scale > 0) {
+        if (digits.size() <= static_cast<size_t>(scale)) {
+            digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), 
'0');
+        }
+        digits.insert(digits.end() - scale, '.');
+        if (negative) {
+            json->push_back('-');
+        }
+        json->append(digits);
+        return;
+    }
+    if (negative) {
+        json->push_back('-');
+    }
+    json->append(digits);
+    json->append(static_cast<size_t>(-scale), '0');
+}
+
+void append_uuid_json(const uint8_t* ptr, std::string* json) {
+    static constexpr char hex[] = "0123456789abcdef";
+    json->push_back('"');
+    for (int i = 0; i < 16; ++i) {
+        if (i == 4 || i == 6 || i == 8 || i == 10) {
+            json->push_back('-');
+        }
+        json->push_back(hex[ptr[i] >> 4]);
+        json->push_back(hex[ptr[i] & 0x0f]);
+    }
+    json->push_back('"');
+}
+
+Status decode_metadata(const StringRef& metadata, VariantMetadata* result) {
+    const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data);
+    const auto* end = ptr + metadata.size;
+    RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata"));
+    uint8_t header = *ptr++;
+    uint8_t version = header & 0x0f;
+    if (version != 1) {
+        return Status::Corruption("Unsupported Parquet VARIANT metadata 
version {}", version);
+    }
+    int offset_size = ((header >> 6) & 0x03) + 1;
+    RETURN_IF_ERROR(require_available(ptr, end, offset_size, "metadata 
dictionary size"));
+    uint64_t dictionary_size = read_unsigned_le(ptr, offset_size);
+    ptr += offset_size;
+
+    RETURN_IF_ERROR(require_available_entries(ptr, end, dictionary_size + 1, 
offset_size,
+                                              "metadata dictionary offsets"));
+    std::vector<uint64_t> offsets(dictionary_size + 1);
+    for (uint64_t i = 0; i <= dictionary_size; ++i) {
+        offsets[i] = read_unsigned_le(ptr, offset_size);
+        ptr += offset_size;
+        if (i > 0 && offsets[i] < offsets[i - 1]) {
+            return Status::Corruption("Invalid Parquet VARIANT metadata 
dictionary offsets");
+        }
+    }
+
+    RETURN_IF_ERROR(require_available(ptr, end, offsets.back(), "metadata 
dictionary bytes"));
+    result->dictionary.clear();
+    result->dictionary.reserve(dictionary_size);
+    for (uint64_t i = 0; i < dictionary_size; ++i) {
+        result->dictionary.emplace_back(reinterpret_cast<const char*>(ptr + 
offsets[i]),
+                                        offsets[i + 1] - offsets[i]);
+    }
+    return Status::OK();
+}
+
+Status decode_value(const uint8_t* ptr, const uint8_t* end, const 
VariantMetadata& metadata,
+                    std::string* json, const uint8_t** next);
+
+Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const 
uint8_t* end,
+                        std::string* json, const uint8_t** next) {
+    switch (primitive_header) {
+    case 0:
+        json->append("null");
+        *next = ptr;
+        return Status::OK();
+    case 1:
+        json->append("true");
+        *next = ptr;
+        return Status::OK();
+    case 2:
+        json->append("false");
+        *next = ptr;
+        return Status::OK();
+    case 3:
+        RETURN_IF_ERROR(require_available(ptr, end, 1, "int8 value"));
+        json->append(std::to_string(static_cast<int8_t>(*ptr)));
+        *next = ptr + 1;
+        return Status::OK();
+    case 4:
+        RETURN_IF_ERROR(require_available(ptr, end, 2, "int16 value"));
+        json->append(std::to_string(read_signed_le(ptr, 2)));
+        *next = ptr + 2;
+        return Status::OK();
+    case 5:
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "int32 value"));
+        json->append(std::to_string(read_signed_le(ptr, 4)));
+        *next = ptr + 4;
+        return Status::OK();
+    case 6:
+        RETURN_IF_ERROR(require_available(ptr, end, 8, "int64 value"));
+        json->append(std::to_string(read_signed_le(ptr, 8)));
+        *next = ptr + 8;
+        return Status::OK();
+    case 7: {
+        RETURN_IF_ERROR(require_available(ptr, end, 8, "double value"));
+        uint64_t bits = read_unsigned_le(ptr, 8);
+        double value;
+        std::memcpy(&value, &bits, sizeof(value));
+        append_floating_json(value, json);
+        *next = ptr + 8;
+        return Status::OK();
+    }
+    case 8:
+    case 9:
+    case 10: {
+        int value_size = primitive_header == 8 ? 4 : primitive_header == 9 ? 8 
: 16;
+        RETURN_IF_ERROR(require_available(ptr, end, 1 + value_size, "decimal 
value"));
+        int scale = static_cast<int8_t>(*ptr++);
+        __int128 unscaled = 0;
+        if (value_size == 16) {
+            unsigned __int128 unsigned_value = 0;
+            for (int i = 15; i >= 0; --i) {
+                unsigned_value <<= 8;
+                unsigned_value |= ptr[i];
+            }
+            if ((ptr[15] & 0x80) != 0) {
+                unscaled = -static_cast<__int128>((~unsigned_value) + 1);
+            } else {
+                unscaled = static_cast<__int128>(unsigned_value);
+            }
+        } else {
+            unscaled = read_signed_le(ptr, value_size);
+        }
+        append_decimal_json(unscaled, scale, json);
+        *next = ptr + value_size;
+        return Status::OK();
+    }
+    case 11:
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "date value"));
+        json->append(std::to_string(read_signed_le(ptr, 4)));
+        *next = ptr + 4;
+        return Status::OK();
+    case 12:
+    case 13:
+    case 17:
+    case 18:
+    case 19:
+        RETURN_IF_ERROR(require_available(ptr, end, 8, "time or timestamp 
value"));
+        json->append(std::to_string(read_signed_le(ptr, 8)));
+        *next = ptr + 8;
+        return Status::OK();
+    case 14: {
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "float value"));
+        uint32_t bits = static_cast<uint32_t>(read_unsigned_le(ptr, 4));
+        float value;
+        std::memcpy(&value, &bits, sizeof(value));
+        append_floating_json(value, json);
+        *next = ptr + 4;
+        return Status::OK();
+    }
+    case 15: {
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "binary length"));
+        uint64_t size = read_unsigned_le(ptr, 4);
+        ptr += 4;
+        RETURN_IF_ERROR(require_available(ptr, end, size, "binary value"));
+        return Status::NotSupported("Parquet VARIANT binary primitive is not 
supported");
+    }
+    case 16: {
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "binary or string 
length"));
+        uint64_t size = read_unsigned_le(ptr, 4);
+        ptr += 4;
+        RETURN_IF_ERROR(require_available(ptr, end, size, "string value"));
+        append_json_string(
+                std::string_view(reinterpret_cast<const char*>(ptr), 
static_cast<size_t>(size)),
+                json);
+        *next = ptr + size;
+        return Status::OK();
+    }
+    case 20:
+        RETURN_IF_ERROR(require_available(ptr, end, 16, "uuid value"));
+        append_uuid_json(ptr, json);
+        *next = ptr + 16;
+        return Status::OK();
+    default:
+        return Status::Corruption("Unsupported Parquet VARIANT primitive 
header {}",
+                                  primitive_header);
+    }
+}
+
+Status decode_object(uint8_t value_header, const uint8_t* ptr, const uint8_t* 
end,
+                     const VariantMetadata& metadata, std::string* json, const 
uint8_t** next) {
+    int field_offset_size = (value_header & 0x03) + 1;
+    int field_id_size = ((value_header >> 2) & 0x03) + 1;
+    int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1;
+
+    RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object 
element count"));
+    uint64_t num_elements = read_unsigned_le(ptr, num_elements_size);
+    ptr += num_elements_size;
+
+    RETURN_IF_ERROR(
+            require_available_entries(ptr, end, num_elements, field_id_size, 
"object field ids"));
+    std::vector<uint64_t> field_ids(num_elements);
+    for (uint64_t i = 0; i < num_elements; ++i) {
+        field_ids[i] = read_unsigned_le(ptr, field_id_size);
+        ptr += field_id_size;
+        if (field_ids[i] >= metadata.dictionary.size()) {
+            return Status::Corruption("Invalid Parquet VARIANT object field id 
{}", field_ids[i]);
+        }
+    }
+
+    RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, 
field_offset_size,
+                                      "object field offsets"));
+    std::vector<uint64_t> field_offsets(num_elements + 1);
+    for (uint64_t i = 0; i <= num_elements; ++i) {
+        field_offsets[i] = read_unsigned_le(ptr, field_offset_size);
+        ptr += field_offset_size;
+    }
+
+    uint64_t total_size = field_offsets.back();
+    const uint8_t* fields = ptr;
+    RETURN_IF_ERROR(require_available(fields, end, total_size, "object field 
values"));
+    RETURN_IF_ERROR(validate_field_offsets(field_offsets, total_size, 
"object"));

Review Comment:
   This rejects valid Parquet VARIANT objects. The VARIANT encoding requires 
field IDs to be sorted by key, but it explicitly says the encoded value entries 
do not need to be stored in that same order, so object `field_offset` entries 
may be non-monotonic. With offsets like `[4, 2, 0, 6]` for sorted keys whose 
values are laid out in a different byte order, `validate_field_offsets()` 
returns corruption here and the `child_end = field_offsets[i + 1]` logic below 
cannot decode the value. Arrays need monotonic offsets, but objects need 
separate handling: validate each offset is within `total_size`, decode from 
each offset using the full object payload (or compute bounds from the set of 
offsets), and verify decoded value boundaries without requiring adjacent 
offsets to increase. Please add a unit/regression case for an object whose 
values are stored out of field-id order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to