eldenmoon commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3234849726


##########
be/src/format/parquet/parquet_variant_reader.cpp:
##########
@@ -0,0 +1,541 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/parquet/parquet_variant_reader.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstring>
+#include <iomanip>
+#include <limits>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+namespace doris::parquet {
+
+namespace {
+
+struct VariantMetadata {
+    std::vector<std::string> dictionary;
+};
+
+uint64_t read_unsigned_le(const uint8_t* ptr, int size) {
+    uint64_t value = 0;
+    for (int i = 0; i < size; ++i) {
+        value |= static_cast<uint64_t>(ptr[i]) << (i * 8);
+    }
+    return value;
+}
+
+int64_t read_signed_le(const uint8_t* ptr, int size) {
+    uint64_t value = read_unsigned_le(ptr, size);
+    if (size < 8) {
+        uint64_t sign_bit = uint64_t {1} << (size * 8 - 1);
+        if ((value & sign_bit) != 0) {
+            uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1);
+            value |= mask;
+        }
+    }
+    return static_cast<int64_t>(value);
+}
+
+Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size,
+                         std::string_view context) {
+    if (ptr > end || size > static_cast<size_t>(end - ptr)) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return Status::OK();
+}
+
+Status require_available_entries(const uint8_t* ptr, const uint8_t* end, 
uint64_t entries,
+                                 size_t entry_size, std::string_view context) {
+    if (entries > std::numeric_limits<size_t>::max() / entry_size) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return require_available(ptr, end, static_cast<size_t>(entries) * 
entry_size, context);
+}
+
+Status validate_array_field_offsets(const std::vector<uint64_t>& 
field_offsets, uint64_t total_size,
+                                    std::string_view context) {
+    if (field_offsets.empty() || field_offsets.front() != 0) {
+        return Status::Corruption("Invalid Parquet VARIANT {} field offsets", 
context);
+    }
+    for (size_t i = 0; i < field_offsets.size(); ++i) {
+        if (field_offsets[i] > total_size) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field offset 
{}", context,
+                                      field_offsets[i]);
+        }
+        if (i > 0 && field_offsets[i] < field_offsets[i - 1]) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field 
offsets", context);
+        }
+    }
+    return Status::OK();
+}
+
+Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, 
uint64_t total_size,
+                                 std::vector<uint64_t>* field_ends) {
+    if (field_offsets.empty()) {
+        return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+    }
+    size_t num_elements = field_offsets.size() - 1;
+    if (num_elements == 0) {
+        if (total_size != 0) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+        }
+        return Status::OK();
+    }
+
+    std::vector<std::pair<uint64_t, size_t>> physical_offsets;
+    physical_offsets.reserve(num_elements);
+    for (size_t i = 0; i < num_elements; ++i) {
+        if (field_offsets[i] >= total_size) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offset {}",
+                                      field_offsets[i]);
+        }
+        physical_offsets.emplace_back(field_offsets[i], i);
+    }
+    std::sort(physical_offsets.begin(), physical_offsets.end());
+    if (physical_offsets.front().first != 0) {
+        return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+    }
+
+    field_ends->assign(num_elements, 0);
+    for (size_t i = 0; i < physical_offsets.size(); ++i) {
+        if (i > 0 && physical_offsets[i].first == physical_offsets[i - 
1].first) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+        }
+        uint64_t child_end =
+                i + 1 < physical_offsets.size() ? physical_offsets[i + 
1].first : total_size;
+        (*field_ends)[physical_offsets[i].second] = child_end;
+    }
+    return Status::OK();
+}
+
+void append_json_string(std::string_view value, std::string* json) {
+    json->push_back('"');
+    static constexpr char hex[] = "0123456789abcdef";
+    for (unsigned char c : value) {
+        switch (c) {
+        case '"':
+            json->append("\\\"");
+            break;
+        case '\\':
+            json->append("\\\\");
+            break;
+        case '\b':
+            json->append("\\b");
+            break;
+        case '\f':
+            json->append("\\f");
+            break;
+        case '\n':
+            json->append("\\n");
+            break;
+        case '\r':
+            json->append("\\r");
+            break;
+        case '\t':
+            json->append("\\t");
+            break;
+        default:
+            if (c < 0x20) {
+                json->append("\\u00");
+                json->push_back(hex[c >> 4]);
+                json->push_back(hex[c & 0x0f]);
+            } else {
+                json->push_back(static_cast<char>(c));
+            }
+            break;
+        }
+    }
+    json->push_back('"');
+}
+
+template <typename T>
+void append_floating_json(T value, std::string* json) {
+    if (!std::isfinite(value)) {
+        json->append("null");
+        return;
+    }
+    std::ostringstream oss;
+    oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value;
+    json->append(oss.str());
+}
+
+std::string int128_to_string(__int128 value) {
+    if (value == 0) {
+        return "0";
+    }
+    bool negative = value < 0;
+    unsigned __int128 unsigned_value = negative ? static_cast<unsigned 
__int128>(-(value + 1)) + 1
+                                                : static_cast<unsigned 
__int128>(value);
+    std::string digits;
+    while (unsigned_value > 0) {
+        digits.push_back(static_cast<char>('0' + unsigned_value % 10));
+        unsigned_value /= 10;
+    }
+    if (negative) {
+        digits.push_back('-');
+    }
+    std::reverse(digits.begin(), digits.end());
+    return digits;
+}
+
+void append_decimal_json(__int128 unscaled, int scale, std::string* json) {
+    std::string value = int128_to_string(unscaled);
+    bool negative = !value.empty() && value[0] == '-';
+    std::string digits = negative ? value.substr(1) : value;
+    if (scale == 0) {
+        json->append(value);
+        return;
+    }
+    if (scale > 0) {
+        if (digits.size() <= static_cast<size_t>(scale)) {
+            digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), 
'0');
+        }
+        digits.insert(digits.end() - scale, '.');
+        if (negative) {
+            json->push_back('-');
+        }
+        json->append(digits);
+        return;
+    }
+    if (negative) {
+        json->push_back('-');
+    }
+    json->append(digits);
+    json->append(static_cast<size_t>(-scale), '0');
+}
+
+void append_uuid_json(const uint8_t* ptr, std::string* json) {
+    static constexpr char hex[] = "0123456789abcdef";
+    json->push_back('"');
+    for (int i = 0; i < 16; ++i) {
+        if (i == 4 || i == 6 || i == 8 || i == 10) {
+            json->push_back('-');
+        }
+        json->push_back(hex[ptr[i] >> 4]);
+        json->push_back(hex[ptr[i] & 0x0f]);
+    }
+    json->push_back('"');
+}
+
+Status decode_metadata(const StringRef& metadata, VariantMetadata* result) {
+    const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data);
+    const auto* end = ptr + metadata.size;
+    RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata"));
+    uint8_t header = *ptr++;
+    uint8_t version = header & 0x0f;
+    if (version != 1) {
+        return Status::Corruption("Unsupported Parquet VARIANT metadata 
version {}", version);
+    }
+    int offset_size = ((header >> 6) & 0x03) + 1;
+    RETURN_IF_ERROR(require_available(ptr, end, offset_size, "metadata 
dictionary size"));
+    uint64_t dictionary_size = read_unsigned_le(ptr, offset_size);
+    ptr += offset_size;
+
+    RETURN_IF_ERROR(require_available_entries(ptr, end, dictionary_size + 1, 
offset_size,
+                                              "metadata dictionary offsets"));
+    std::vector<uint64_t> offsets(dictionary_size + 1);
+    for (uint64_t i = 0; i <= dictionary_size; ++i) {
+        offsets[i] = read_unsigned_le(ptr, offset_size);
+        ptr += offset_size;
+        if (i > 0 && offsets[i] < offsets[i - 1]) {
+            return Status::Corruption("Invalid Parquet VARIANT metadata 
dictionary offsets");
+        }
+    }
+
+    RETURN_IF_ERROR(require_available(ptr, end, offsets.back(), "metadata 
dictionary bytes"));
+    result->dictionary.clear();
+    result->dictionary.reserve(dictionary_size);
+    for (uint64_t i = 0; i < dictionary_size; ++i) {
+        result->dictionary.emplace_back(reinterpret_cast<const char*>(ptr + 
offsets[i]),
+                                        offsets[i + 1] - offsets[i]);

Review Comment:
   Fixed in the latest push by rejecting duplicate Parquet VARIANT metadata 
dictionary keys during metadata decode. Added 
ParquetVariantReaderTest.RejectDuplicateMetadataDictionaryKeys and verified 
ParquetVariantReaderTest.* locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to