github-actions[bot] commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3229990011
##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -1001,6 +1125,382 @@ Status StructColumnReader::read_column_data(
return Status::OK();
}
+Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+ const tparquet::RowGroup& row_group, size_t
max_buf_size,
+ std::unordered_map<int,
tparquet::OffsetIndex>& col_offsets,
+ RuntimeState* state, bool in_collection,
+ const std::set<uint64_t>& column_ids,
+ const std::set<uint64_t>& filter_column_ids) {
+ _field_schema = field;
+ _variant_struct_field = std::make_unique<FieldSchema>(*field);
+
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(field->children.size());
+ child_names.reserve(field->children.size());
+ for (const auto& child : field->children) {
+ child_types.push_back(make_nullable(child.data_type));
+ child_names.push_back(child.name);
+ }
+ _variant_struct_type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ if (field->data_type->is_nullable()) {
+ _variant_struct_type = make_nullable(_variant_struct_type);
+ }
+ _variant_struct_field->data_type = _variant_struct_type;
+
+ RETURN_IF_ERROR(ParquetColumnReader::create(file,
_variant_struct_field.get(), row_group,
+ _row_ranges, _ctz, _io_ctx,
_struct_reader,
+ max_buf_size, col_offsets,
state, in_collection,
+ column_ids,
filter_column_ids));
+ _struct_reader->set_column_in_nested();
+ return Status::OK();
+}
+
+Status VariantColumnReader::_get_binary_field(const Field& field, std::string*
value,
+ bool* present) const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ *present = true;
+ switch (field.get_type()) {
+ case TYPE_STRING:
+ *value = field.get<TYPE_STRING>();
+ return Status::OK();
+ case TYPE_CHAR:
+ *value = field.get<TYPE_CHAR>();
+ return Status::OK();
+ case TYPE_VARCHAR:
+ *value = field.get<TYPE_VARCHAR>();
+ return Status::OK();
+ case TYPE_VARBINARY: {
+ auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+ value->assign(ref.data, ref.size);
+ return Status::OK();
+ }
+ default:
+ return Status::Corruption("Parquet VARIANT binary field has unexpected
Doris type {}",
+ field.get_type_name());
+ }
+}
+
+Status VariantColumnReader::_field_to_json(const FieldSchema& field_schema,
const Field& field,
+ std::string* json, bool* present)
const {
+ if (field.is_null()) {
+ *present = false;
+ return Status::OK();
+ }
+ *present = true;
+ const DataTypePtr& type = remove_nullable(field_schema.data_type);
+ switch (type->get_primitive_type()) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ case TYPE_DECIMAL256:
+ json->append(field.to_debug_string(type->get_scale()));
+ return Status::OK();
+ case TYPE_FLOAT: {
+ const auto value = field.get<TYPE_FLOAT>();
+ json->append(std::isfinite(value) ?
field.to_debug_string(type->get_scale()) : "null");
+ return Status::OK();
+ }
+ case TYPE_DOUBLE: {
+ const auto value = field.get<TYPE_DOUBLE>();
+ json->append(std::isfinite(value) ?
field.to_debug_string(type->get_scale()) : "null");
+ return Status::OK();
+ }
+ case TYPE_TIMEV2:
+ json->append(std::to_string(field.get<TYPE_TIMEV2>()));
+ return Status::OK();
+ case TYPE_DATE:
+ case TYPE_DATETIME:
Review Comment:
Shredded temporal typed values do not decode the same way as the unshredded
VARIANT `value` path. Here DATE/DATETIME/TIMESTAMP fields are converted through
`Field::to_debug_string()` and quoted as JSON strings (while TIMEV2 is emitted
as a numeric value), but `decode_primitive()` emits Parquet VARIANT
date/timestamp/time primitives as their raw numeric payloads. A valid file can
store the same logical field in `value` for some rows and in `typed_value` for
others, so Doris will materialize different `ColumnVariant` types depending
only on shredding layout. Please normalize temporal primitives consistently
across both paths and add a regression that reads the same DATE/TIME/TIMESTAMP
value from unshredded and shredded VARIANT columns.
##########
be/src/format/parquet/parquet_variant_reader.cpp:
##########
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/parquet/parquet_variant_reader.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstring>
+#include <iomanip>
+#include <limits>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+namespace doris::parquet {
+
+namespace {
+
+struct VariantMetadata {
+ std::vector<std::string> dictionary;
+};
+
+uint64_t read_unsigned_le(const uint8_t* ptr, int size) {
+ uint64_t value = 0;
+ for (int i = 0; i < size; ++i) {
+ value |= static_cast<uint64_t>(ptr[i]) << (i * 8);
+ }
+ return value;
+}
+
+int64_t read_signed_le(const uint8_t* ptr, int size) {
+ uint64_t value = read_unsigned_le(ptr, size);
+ if (size < 8) {
+ uint64_t sign_bit = uint64_t {1} << (size * 8 - 1);
+ if ((value & sign_bit) != 0) {
+ uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1);
+ value |= mask;
+ }
+ }
+ return static_cast<int64_t>(value);
+}
+
+Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size,
+ std::string_view context) {
+ if (ptr + size > end) {
+ return Status::Corruption("Invalid Parquet VARIANT {} encoding",
context);
+ }
+ return Status::OK();
+}
+
+void append_json_string(std::string_view value, std::string* json) {
+ json->push_back('"');
+ static constexpr char hex[] = "0123456789abcdef";
+ for (unsigned char c : value) {
+ switch (c) {
+ case '"':
+ json->append("\\\"");
+ break;
+ case '\\':
+ json->append("\\\\");
+ break;
+ case '\b':
+ json->append("\\b");
+ break;
+ case '\f':
+ json->append("\\f");
+ break;
+ case '\n':
+ json->append("\\n");
+ break;
+ case '\r':
+ json->append("\\r");
+ break;
+ case '\t':
+ json->append("\\t");
+ break;
+ default:
+ if (c < 0x20) {
+ json->append("\\u00");
+ json->push_back(hex[c >> 4]);
+ json->push_back(hex[c & 0x0f]);
+ } else {
+ json->push_back(static_cast<char>(c));
+ }
+ break;
+ }
+ }
+ json->push_back('"');
+}
+
+template <typename T>
+void append_floating_json(T value, std::string* json) {
+ if (!std::isfinite(value)) {
+ json->append("null");
+ return;
+ }
+ std::ostringstream oss;
+ oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value;
+ json->append(oss.str());
+}
+
+std::string int128_to_string(__int128 value) {
+ if (value == 0) {
+ return "0";
+ }
+ bool negative = value < 0;
+ unsigned __int128 unsigned_value = negative ? static_cast<unsigned
__int128>(-(value + 1)) + 1
+ : static_cast<unsigned
__int128>(value);
+ std::string digits;
+ while (unsigned_value > 0) {
+ digits.push_back(static_cast<char>('0' + unsigned_value % 10));
+ unsigned_value /= 10;
+ }
+ if (negative) {
+ digits.push_back('-');
+ }
+ std::reverse(digits.begin(), digits.end());
+ return digits;
+}
+
+void append_decimal_json(__int128 unscaled, int scale, std::string* json) {
+ std::string value = int128_to_string(unscaled);
+ bool negative = !value.empty() && value[0] == '-';
+ std::string digits = negative ? value.substr(1) : value;
+ if (scale == 0) {
+ json->append(value);
+ return;
+ }
+ if (scale > 0) {
+ if (digits.size() <= static_cast<size_t>(scale)) {
+ digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(),
'0');
+ }
+ digits.insert(digits.end() - scale, '.');
+ if (negative) {
+ json->push_back('-');
+ }
+ json->append(digits);
+ return;
+ }
+ if (negative) {
+ json->push_back('-');
+ }
+ json->append(digits);
+ json->append(static_cast<size_t>(-scale), '0');
+}
+
+void append_uuid_json(const uint8_t* ptr, std::string* json) {
+ static constexpr char hex[] = "0123456789abcdef";
+ json->push_back('"');
+ for (int i = 0; i < 16; ++i) {
+ if (i == 4 || i == 6 || i == 8 || i == 10) {
+ json->push_back('-');
+ }
+ json->push_back(hex[ptr[i] >> 4]);
+ json->push_back(hex[ptr[i] & 0x0f]);
+ }
+ json->push_back('"');
+}
+
+Status decode_metadata(const StringRef& metadata, VariantMetadata* result) {
+ const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data);
+ const auto* end = ptr + metadata.size;
+ RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata"));
+ uint8_t header = *ptr++;
+ uint8_t version = header & 0x0f;
+ if (version != 1) {
+ return Status::Corruption("Unsupported Parquet VARIANT metadata
version {}", version);
+ }
+ int offset_size = ((header >> 6) & 0x03) + 1;
+ RETURN_IF_ERROR(require_available(ptr, end, offset_size, "metadata
dictionary size"));
+ uint64_t dictionary_size = read_unsigned_le(ptr, offset_size);
+ ptr += offset_size;
+
+ size_t offsets_bytes = (dictionary_size + 1) * offset_size;
+ RETURN_IF_ERROR(require_available(ptr, end, offsets_bytes, "metadata
dictionary offsets"));
+ std::vector<uint64_t> offsets(dictionary_size + 1);
+ for (uint64_t i = 0; i <= dictionary_size; ++i) {
+ offsets[i] = read_unsigned_le(ptr, offset_size);
+ ptr += offset_size;
+ if (i > 0 && offsets[i] < offsets[i - 1]) {
+ return Status::Corruption("Invalid Parquet VARIANT metadata
dictionary offsets");
+ }
+ }
+
+ RETURN_IF_ERROR(require_available(ptr, end, offsets.back(), "metadata
dictionary bytes"));
+ result->dictionary.clear();
+ result->dictionary.reserve(dictionary_size);
+ for (uint64_t i = 0; i < dictionary_size; ++i) {
+ result->dictionary.emplace_back(reinterpret_cast<const char*>(ptr +
offsets[i]),
+ offsets[i + 1] - offsets[i]);
+ }
+ return Status::OK();
+}
+
+Status decode_value(const uint8_t* ptr, const uint8_t* end, const
VariantMetadata& metadata,
+ std::string* json, const uint8_t** next);
+
+Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const
uint8_t* end,
+ std::string* json, const uint8_t** next) {
+ switch (primitive_header) {
+ case 0:
+ json->append("null");
+ *next = ptr;
+ return Status::OK();
+ case 1:
+ json->append("true");
+ *next = ptr;
+ return Status::OK();
+ case 2:
+ json->append("false");
+ *next = ptr;
+ return Status::OK();
+ case 3:
+ RETURN_IF_ERROR(require_available(ptr, end, 1, "int8 value"));
+ json->append(std::to_string(static_cast<int8_t>(*ptr)));
+ *next = ptr + 1;
+ return Status::OK();
+ case 4:
+ RETURN_IF_ERROR(require_available(ptr, end, 2, "int16 value"));
+ json->append(std::to_string(read_signed_le(ptr, 2)));
+ *next = ptr + 2;
+ return Status::OK();
+ case 5:
+ RETURN_IF_ERROR(require_available(ptr, end, 4, "int32 value"));
+ json->append(std::to_string(read_signed_le(ptr, 4)));
+ *next = ptr + 4;
+ return Status::OK();
+ case 6:
+ RETURN_IF_ERROR(require_available(ptr, end, 8, "int64 value"));
+ json->append(std::to_string(read_signed_le(ptr, 8)));
+ *next = ptr + 8;
+ return Status::OK();
+ case 7: {
+ RETURN_IF_ERROR(require_available(ptr, end, 8, "double value"));
+ uint64_t bits = read_unsigned_le(ptr, 8);
+ double value;
+ std::memcpy(&value, &bits, sizeof(value));
+ append_floating_json(value, json);
+ *next = ptr + 8;
+ return Status::OK();
+ }
+ case 8:
+ case 9:
+ case 10: {
+ int value_size = primitive_header == 8 ? 4 : primitive_header == 9 ? 8
: 16;
+ RETURN_IF_ERROR(require_available(ptr, end, 1 + value_size, "decimal
value"));
+ int scale = static_cast<int8_t>(*ptr++);
+ __int128 unscaled = 0;
+ if (value_size == 16) {
+ unsigned __int128 unsigned_value = 0;
+ for (int i = 15; i >= 0; --i) {
+ unsigned_value <<= 8;
+ unsigned_value |= ptr[i];
+ }
+ if ((ptr[15] & 0x80) != 0) {
+ unscaled = -static_cast<__int128>((~unsigned_value) + 1);
+ } else {
+ unscaled = static_cast<__int128>(unsigned_value);
+ }
+ } else {
+ unscaled = read_signed_le(ptr, value_size);
+ }
+ append_decimal_json(unscaled, scale, json);
+ *next = ptr + value_size;
+ return Status::OK();
+ }
+ case 11:
+ RETURN_IF_ERROR(require_available(ptr, end, 4, "date value"));
+ json->append(std::to_string(read_signed_le(ptr, 4)));
+ *next = ptr + 4;
+ return Status::OK();
+ case 12:
+ case 13:
+ case 17:
+ case 18:
+ case 19:
+ RETURN_IF_ERROR(require_available(ptr, end, 8, "time or timestamp
value"));
+ json->append(std::to_string(read_signed_le(ptr, 8)));
+ *next = ptr + 8;
+ return Status::OK();
+ case 14: {
+ RETURN_IF_ERROR(require_available(ptr, end, 4, "float value"));
+ uint32_t bits = static_cast<uint32_t>(read_unsigned_le(ptr, 4));
+ float value;
+ std::memcpy(&value, &bits, sizeof(value));
+ append_floating_json(value, json);
+ *next = ptr + 4;
+ return Status::OK();
+ }
+ case 15:
+ case 16: {
Review Comment:
Parquet VARIANT `binary` is not equivalent to `string`, and the payload is
not required to be valid UTF-8. This branch serializes primitive 15 by passing
the raw bytes through JSON string escaping, which only escapes control
characters; bytes such as `0xff` produce invalid JSON/UTF-8 (or are later
treated as an ordinary Doris string via the invalid-JSON fallback). The
shredded path has the same semantic gap for `TYPE_VARBINARY` in
`_field_to_json()`. Please define a stable representation or reject unsupported
binary values explicitly, and cover a non-UTF8 binary VARIANT value so valid
Parquet files do not silently change type or fail depending on parser settings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]