github-actions[bot] commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3229734781


##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -1001,6 +1091,368 @@ Status StructColumnReader::read_column_data(
     return Status::OK();
 }
 
+Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+                                 const tparquet::RowGroup& row_group, size_t 
max_buf_size,
+                                 std::unordered_map<int, 
tparquet::OffsetIndex>& col_offsets,
+                                 RuntimeState* state, bool in_collection,
+                                 const std::set<uint64_t>& column_ids,
+                                 const std::set<uint64_t>& filter_column_ids) {
+    _field_schema = field;
+    _variant_struct_field = std::make_unique<FieldSchema>(*field);
+
+    DataTypes child_types;
+    Strings child_names;
+    child_types.reserve(field->children.size());
+    child_names.reserve(field->children.size());
+    for (const auto& child : field->children) {
+        child_types.push_back(make_nullable(child.data_type));
+        child_names.push_back(child.name);
+    }
+    _variant_struct_type = std::make_shared<DataTypeStruct>(child_types, 
child_names);
+    if (field->data_type->is_nullable()) {
+        _variant_struct_type = make_nullable(_variant_struct_type);
+    }
+    _variant_struct_field->data_type = _variant_struct_type;
+
+    RETURN_IF_ERROR(ParquetColumnReader::create(file, 
_variant_struct_field.get(), row_group,
+                                                _row_ranges, _ctz, _io_ctx, 
_struct_reader,
+                                                max_buf_size, col_offsets, 
state, in_collection,
+                                                column_ids, 
filter_column_ids));
+    _struct_reader->set_column_in_nested();
+    return Status::OK();
+}
+
+Status VariantColumnReader::_get_binary_field(const Field& field, std::string* 
value,
+                                              bool* present) const {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    switch (field.get_type()) {
+    case TYPE_STRING:
+        *value = field.get<TYPE_STRING>();
+        return Status::OK();
+    case TYPE_CHAR:
+        *value = field.get<TYPE_CHAR>();
+        return Status::OK();
+    case TYPE_VARCHAR:
+        *value = field.get<TYPE_VARCHAR>();
+        return Status::OK();
+    case TYPE_VARBINARY: {
+        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+        value->assign(ref.data, ref.size);
+        return Status::OK();
+    }
+    default:
+        return Status::Corruption("Parquet VARIANT binary field has unexpected 
Doris type {}",
+                                  field.get_type_name());
+    }
+}
+
+Status VariantColumnReader::_field_to_json(const FieldSchema& field_schema, 
const Field& field,
+                                           std::string* json, bool* present) 
const {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    const DataTypePtr& type = remove_nullable(field_schema.data_type);
+    switch (type->get_primitive_type()) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+        json->append(field.to_debug_string(type->get_scale()));
+        return Status::OK();

Review Comment:
   Shredded `FLOAT`/`DOUBLE` values are emitted with `Field::to_debug_string()` 
and then parsed as JSON. For Parquet-valid NaN/Inf values this can produce 
non-JSON tokens, while the unshredded decoder explicitly normalizes non-finite 
floating values to JSON `null` via `append_floating_json()`. That makes the 
same VARIANT value fail or decode differently depending on whether it is stored 
in `value` or `typed_value`. Please handle non-finite float/double values 
consistently in the shredded typed-value path and add a regression/unit case.



##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -1001,6 +1091,368 @@ Status StructColumnReader::read_column_data(
     return Status::OK();
 }
 
+Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+                                 const tparquet::RowGroup& row_group, size_t 
max_buf_size,
+                                 std::unordered_map<int, 
tparquet::OffsetIndex>& col_offsets,
+                                 RuntimeState* state, bool in_collection,
+                                 const std::set<uint64_t>& column_ids,
+                                 const std::set<uint64_t>& filter_column_ids) {
+    _field_schema = field;
+    _variant_struct_field = std::make_unique<FieldSchema>(*field);
+
+    DataTypes child_types;
+    Strings child_names;
+    child_types.reserve(field->children.size());
+    child_names.reserve(field->children.size());
+    for (const auto& child : field->children) {
+        child_types.push_back(make_nullable(child.data_type));
+        child_names.push_back(child.name);
+    }
+    _variant_struct_type = std::make_shared<DataTypeStruct>(child_types, 
child_names);
+    if (field->data_type->is_nullable()) {
+        _variant_struct_type = make_nullable(_variant_struct_type);
+    }
+    _variant_struct_field->data_type = _variant_struct_type;
+
+    RETURN_IF_ERROR(ParquetColumnReader::create(file, 
_variant_struct_field.get(), row_group,
+                                                _row_ranges, _ctz, _io_ctx, 
_struct_reader,
+                                                max_buf_size, col_offsets, 
state, in_collection,
+                                                column_ids, 
filter_column_ids));
+    _struct_reader->set_column_in_nested();
+    return Status::OK();
+}
+
+Status VariantColumnReader::_get_binary_field(const Field& field, std::string* 
value,
+                                              bool* present) const {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    switch (field.get_type()) {
+    case TYPE_STRING:
+        *value = field.get<TYPE_STRING>();
+        return Status::OK();
+    case TYPE_CHAR:
+        *value = field.get<TYPE_CHAR>();
+        return Status::OK();
+    case TYPE_VARCHAR:
+        *value = field.get<TYPE_VARCHAR>();
+        return Status::OK();
+    case TYPE_VARBINARY: {
+        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+        value->assign(ref.data, ref.size);
+        return Status::OK();
+    }
+    default:
+        return Status::Corruption("Parquet VARIANT binary field has unexpected 
Doris type {}",
+                                  field.get_type_name());
+    }
+}
+
+Status VariantColumnReader::_field_to_json(const FieldSchema& field_schema, 
const Field& field,
+                                           std::string* json, bool* present) 
const {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    const DataTypePtr& type = remove_nullable(field_schema.data_type);
+    switch (type->get_primitive_type()) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+        json->append(field.to_debug_string(type->get_scale()));
+        return Status::OK();
+    case TYPE_TIMEV2:
+        json->append(std::to_string(field.get<TYPE_TIMEV2>()));
+        return Status::OK();
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+    case TYPE_DATEV2:
+    case TYPE_DATETIMEV2:
+    case TYPE_TIMESTAMPTZ:
+    case TYPE_STRING:
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_VARBINARY: {
+        std::string value = field.to_debug_string(type->get_scale());
+        append_json_string(value, json);
+        return Status::OK();
+    }
+    case TYPE_ARRAY:
+        return _array_field_to_json(field_schema, field,
+                                    assert_cast<const 
DataTypeArray*>(type.get()), json);
+    case TYPE_STRUCT:
+        return _struct_field_to_json(field_schema, field, json);
+    default:
+        return Status::Corruption("Unsupported Parquet VARIANT typed_value 
Doris type {}",
+                                  type->get_name());
+    }
+}
+
+Status VariantColumnReader::_array_field_to_json(const FieldSchema& 
field_schema,
+                                                 const Field& field,
+                                                 const DataTypeArray* 
array_type,
+                                                 std::string* json) const {
+    const auto& values = field.get<TYPE_ARRAY>();
+    const FieldSchema* element_schema =
+            field_schema.children.empty() ? nullptr : 
field_schema.children.data();
+    FieldSchema synthetic_element;
+    if (element_schema == nullptr) {
+        synthetic_element.data_type = array_type->get_nested_type();
+        element_schema = &synthetic_element;
+    }
+    json->push_back('[');
+    for (int i = 0; i < values.size(); ++i) {
+        if (i != 0) {
+            json->push_back(',');
+        }
+        std::string element_json;
+        bool element_present = false;
+        RETURN_IF_ERROR(
+                _field_to_json(*element_schema, values[i], &element_json, 
&element_present));
+        json->append(element_present ? element_json : "null");
+    }
+    json->push_back(']');
+    return Status::OK();
+}
+
+Status VariantColumnReader::_struct_field_to_json(const FieldSchema& 
field_schema,
+                                                  const Field& field, 
std::string* json) const {
+    const auto& values = field.get<TYPE_STRUCT>();
+    json->push_back('{');
+    for (int i = 0; i < values.size(); ++i) {
+        if (i != 0) {
+            json->push_back(',');
+        }
+        const FieldSchema& child_schema = field_schema.children[i];
+        append_json_string(child_schema.name, json);
+        json->push_back(':');
+        std::string child_json;
+        bool child_present = false;
+        RETURN_IF_ERROR(_field_to_json(child_schema, values[i], &child_json, 
&child_present));
+        json->append(child_present ? child_json : "null");
+    }
+    json->push_back('}');
+    return Status::OK();
+}
+
+Status VariantColumnReader::_typed_value_to_json(const FieldSchema& 
typed_value_field,
+                                                 const Field& field, const 
std::string& metadata,
+                                                 std::string* json, bool* 
present) const {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    const DataTypePtr& typed_type = 
remove_nullable(typed_value_field.data_type);
+    if (typed_type->get_primitive_type() == TYPE_STRUCT) {
+        *present = true;
+        return _typed_struct_to_json(typed_value_field, field, metadata, json);
+    }
+    if (typed_type->get_primitive_type() == TYPE_ARRAY) {
+        *present = true;
+        return _typed_array_to_json(typed_value_field, field, metadata, json);
+    }
+    return _field_to_json(typed_value_field, field, json, present);
+}
+
+Status VariantColumnReader::_typed_struct_to_json(const FieldSchema& 
typed_value_field,
+                                                  const Field& field, const 
std::string& metadata,
+                                                  std::string* json) const {
+    const auto& values = field.get<TYPE_STRUCT>();
+    json->push_back('{');
+    bool first = true;
+    for (int i = 0; i < typed_value_field.children.size(); ++i) {
+        std::string child_json;
+        bool child_present = false;
+        RETURN_IF_ERROR(_variant_to_json(typed_value_field.children[i], 
values[i], &metadata,
+                                         &child_json, &child_present));
+        if (!child_present) {
+            continue;
+        }
+        if (!first) {
+            json->push_back(',');
+        }
+        first = false;
+        append_json_string(typed_value_field.children[i].name, json);
+        json->push_back(':');
+        json->append(child_json);
+    }
+    json->push_back('}');
+    return Status::OK();
+}
+
+Status VariantColumnReader::_typed_array_to_json(const FieldSchema& 
typed_value_field,
+                                                 const Field& field, const 
std::string& metadata,
+                                                 std::string* json) const {
+    const auto& values = field.get<TYPE_ARRAY>();
+    const FieldSchema* element_schema =
+            typed_value_field.children.empty() ? nullptr : 
typed_value_field.children.data();
+    json->push_back('[');
+    for (int i = 0; i < values.size(); ++i) {
+        if (i != 0) {
+            json->push_back(',');
+        }
+        std::string element_json;
+        bool element_present = false;
+        if (element_schema != nullptr && find_child_idx(*element_schema, 
"value") >= 0) {
+            RETURN_IF_ERROR(_variant_to_json(*element_schema, values[i], 
&metadata, &element_json,

Review Comment:
   This only recognizes shredded array elements when the element group contains 
a `value` child, but the Parquet Variant Shredding spec allows array `element` 
to omit `value` when elements are fully shredded as a specific `typed_value` 
type. For a valid schema like `typed_value (LIST) -> element { optional binary 
typed_value (STRING) }`, this branch falls through to 
`_field_to_json(*element_schema, ...)` and reconstructs each element as an 
object such as `{"typed_value":"x"}` instead of `"x"`. Please treat an element 
group with either `value` or `typed_value` as a shredded variant element and 
add coverage for the value-omitted array-element layout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to