This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new c3c9d3b28ba [feature](be) Support parquet struct scalar assembly
c3c9d3b28ba is described below

commit c3c9d3b28ba02688071f5cc0c065c775eb3353bd
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 17:16:33 2026 +0800

    [feature](be) Support parquet struct scalar assembly
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Support parquet STRUCT reading with scalar children 
through definition-level assembly, including nullable parent struct handling 
and projected struct child reads.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Manual test
        - Ran git diff --check locally.
        - Ran BUILD_TYPE=DEBUG ./build.sh --be on Fedora.
    - Behavior changed: Yes
        - New parquet reader now supports nullable STRUCT columns with scalar 
children and projected scalar struct children.
    - Does this need documentation: No
---
 be/src/format/new_parquet/column_reader.cpp        | 133 +++++++++++++++++++--
 .../new_parquet/parquet_column_reader_test.cpp     | 113 +++++++++++++++++
 2 files changed, 237 insertions(+), 9 deletions(-)

diff --git a/be/src/format/new_parquet/column_reader.cpp 
b/be/src/format/new_parquet/column_reader.cpp
index 9d9ac98ced9..37d1efa322e 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -110,6 +110,7 @@ public:
     StructColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
                        std::vector<std::unique_ptr<ParquetColumnReader>> 
children)
             : _field_id(schema.top_level_field_id),
+              _nullable_definition_level(schema.nullable_definition_level),
               _type(std::move(type)),
               _name(schema.name),
               _children(std::move(children)) {}
@@ -124,6 +125,7 @@ public:
 
 private:
     int _field_id = -1;
+    int16_t _nullable_definition_level = 0;
     DataTypePtr _type;
     std::string _name;
     std::vector<std::unique_ptr<ParquetColumnReader>> _children;
@@ -586,6 +588,13 @@ ColumnMap* map_column_from_output(MutableColumnPtr& 
column) {
     return assert_cast<ColumnMap*>(column.get());
 }
 
+ColumnStruct* struct_column_from_output(MutableColumnPtr& column) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) 
{
+        return 
assert_cast<ColumnStruct*>(&nullable_column->get_nested_column());
+    }
+    return assert_cast<ColumnStruct*>(column.get());
+}
+
 NullMap* null_map_from_nullable_output(MutableColumnPtr& column) {
     if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) 
{
         return &nullable_column->get_null_map_data();
@@ -732,14 +741,120 @@ Status StructColumnReader::read(int64_t rows, 
MutableColumnPtr& column, int64_t*
         return Status::OK();
     }
 
+    auto* struct_column = struct_column_from_output(column);
+    DORIS_CHECK(struct_column != nullptr);
+    auto* parent_null_map = null_map_from_nullable_output(column);
+    DCHECK_EQ(struct_column->get_columns().size(), _children.size());
+
+    std::vector<ScalarColumnReader*> scalar_children;
+    scalar_children.reserve(_children.size());
+    bool all_scalar_children = true;
+    for (const auto& child_reader : _children) {
+        DORIS_CHECK(child_reader != nullptr);
+        auto* scalar_child = 
dynamic_cast<ScalarColumnReader*>(child_reader.get());
+        if (scalar_child == nullptr) {
+            all_scalar_children = false;
+            break;
+        }
+        scalar_children.push_back(scalar_child);
+    }
+    if (all_scalar_children) {
+        std::vector<NestedScalarBatch> child_batches(scalar_children.size());
+        int64_t expected_rows = -1;
+        for (size_t child_idx = 0; child_idx < scalar_children.size(); 
++child_idx) {
+            
RETURN_IF_ERROR(read_nested_scalar_batch(*scalar_children[child_idx], rows,
+                                                     
_nullable_definition_level,
+                                                     
&child_batches[child_idx]));
+            if (expected_rows < 0) {
+                expected_rows = child_batches[child_idx].records_read;
+            } else if (child_batches[child_idx].records_read != expected_rows) 
{
+                return Status::Corruption(
+                        "Parquet struct children returned different row counts 
in column {}: {} "
+                        "vs {}",
+                        _name, expected_rows, 
child_batches[child_idx].records_read);
+            }
+            if (child_batches[child_idx].levels_written != 
child_batches[child_idx].records_read) {
+                return Status::Corruption(
+                        "Parquet struct child {} returned repeated levels in 
column {}",
+                        scalar_children[child_idx]->name(), _name);
+            }
+        }
+
+        if (expected_rows <= 0) {
+            *rows_read = 0;
+            return Status::OK();
+        }
+
+        std::vector<MutableColumnPtr> child_columns;
+        child_columns.reserve(scalar_children.size());
+        for (size_t child_idx = 0; child_idx < scalar_children.size(); 
++child_idx) {
+            
child_columns.push_back(struct_column->get_column_ptr(child_idx)->assume_mutable());
+        }
+
+        NullMap parent_nulls;
+        parent_nulls.reserve(static_cast<size_t>(expected_rows));
+        for (int64_t row_idx = 0; row_idx < expected_rows; ++row_idx) {
+            const bool parent_is_null =
+                    child_batches[0].def_levels[row_idx] < 
_nullable_definition_level;
+            parent_nulls.push_back(parent_is_null);
+            for (size_t child_idx = 1; child_idx < child_batches.size(); 
++child_idx) {
+                const bool child_parent_is_null =
+                        child_batches[child_idx].def_levels[row_idx] < 
_nullable_definition_level;
+                if (child_parent_is_null != parent_is_null) {
+                    return Status::Corruption(
+                            "Parquet struct children returned different null 
parent shape in "
+                            "column {}",
+                            _name);
+                }
+            }
+            for (size_t child_idx = 0; child_idx < scalar_children.size(); 
++child_idx) {
+                if (parent_is_null) {
+                    child_columns[child_idx]->insert_default();
+                } else {
+                    if (!scalar_children[child_idx]->type()->is_nullable() &&
+                        child_batches[child_idx].def_levels[row_idx] !=
+                                
scalar_children[child_idx]->descriptor()->max_definition_level()) {
+                        return Status::Corruption(
+                                "Parquet STRUCT column {} contains null for 
non-nullable child {}",
+                                _name, scalar_children[child_idx]->name());
+                    }
+                    
RETURN_IF_ERROR(append_scalar_batch_value(*scalar_children[child_idx],
+                                                              
child_batches[child_idx], row_idx,
+                                                              
child_columns[child_idx]));
+                }
+            }
+        }
+        for (size_t child_idx = 0; child_idx < child_columns.size(); 
++child_idx) {
+            struct_column->get_column_ptr(child_idx) = 
std::move(child_columns[child_idx]);
+        }
+        if (parent_null_map == nullptr) {
+            for (const auto parent_is_null : parent_nulls) {
+                if (parent_is_null) {
+                    return Status::Corruption(
+                            "Parquet STRUCT column {} contains null for 
non-nullable struct",
+                            _name);
+                }
+            }
+        } else {
+            append_parent_nulls(parent_null_map, parent_nulls);
+        }
+        *rows_read = expected_rows;
+        return Status::OK();
+    }
+
+    if (parent_null_map != nullptr) {
+        return Status::NotSupported(
+                "Current parquet nullable STRUCT reader only supports scalar 
children for column "
+                "{}",
+                _name);
+    }
+
     int64_t expected_rows = -1;
     size_t child_idx = 0;
-    DCHECK_EQ(assert_cast<ColumnStruct&>(*column).get_columns().size(), 
_children.size());
     for (auto& child_reader : _children) {
         DORIS_CHECK(child_reader != nullptr);
         int64_t child_rows = 0;
-        auto child_column =
-                
assert_cast<ColumnStruct&>(*column).get_column_ptr(child_idx)->assume_mutable();
+        auto child_column = 
struct_column->get_column_ptr(child_idx)->assume_mutable();
         RETURN_IF_ERROR(child_reader->read(rows, child_column, &child_rows));
         if (expected_rows < 0) {
             expected_rows = child_rows;
@@ -748,6 +863,7 @@ Status StructColumnReader::read(int64_t rows, 
MutableColumnPtr& column, int64_t*
                     "Parquet struct children returned different row counts in 
column {}: {} vs {}",
                     _name, expected_rows, child_rows);
         }
+        struct_column->get_column_ptr(child_idx) = std::move(child_column);
         child_idx++;
     }
 
@@ -1288,11 +1404,6 @@ Status 
ParquetColumnReaderFactory::create_struct_column_reader(
     if (reader == nullptr) {
         return Status::InvalidArgument("reader is null");
     }
-    if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
-        return Status::NotSupported(
-                "Nullable parquet STRUCT reader is not implemented for column 
{}",
-                column_schema.name);
-    }
     std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
     child_readers.reserve(column_schema.children.size());
     DataTypes projected_child_types;
@@ -1311,7 +1422,11 @@ Status 
ParquetColumnReaderFactory::create_struct_column_reader(
             child_projection = &*it;
         }
         std::unique_ptr<ParquetColumnReader> child_reader;
-        RETURN_IF_ERROR(create(*child_schema, child_projection, 
&child_reader));
+        if (child_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) {
+            RETURN_IF_ERROR(create_nested_scalar_column_reader(*child_schema, 
&child_reader));
+        } else {
+            RETURN_IF_ERROR(create(*child_schema, child_projection, 
&child_reader));
+        }
         projected_child_types.push_back(child_reader->type());
         projected_child_names.push_back(child_reader->name());
         child_readers.push_back(std::move(child_reader));
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp 
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 50aa801f4c7..059e9b709aa 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -148,6 +148,33 @@ protected:
         return finish_array(&builder);
     }
 
+    std::shared_ptr<arrow::Array> build_nullable_struct_array() {
+        auto struct_type = arrow::struct_(
+                {arrow::field("a", arrow::int32(), false), arrow::field("b", 
arrow::utf8(), true)});
+        std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
+        auto a_array_builder = std::make_unique<arrow::Int32Builder>();
+        
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder)));
+        auto b_array_builder = std::make_unique<arrow::StringBuilder>();
+        
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder)));
+        arrow::StructBuilder builder(struct_type, arrow::default_memory_pool(),
+                                     std::move(field_builders));
+        auto* a_builder = 
assert_cast<arrow::Int32Builder*>(builder.field_builder(0));
+        auto* b_builder = 
assert_cast<arrow::StringBuilder*>(builder.field_builder(1));
+
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(a_builder->Append(201).ok());
+        EXPECT_TRUE(b_builder->Append("nsa").ok());
+        EXPECT_TRUE(builder.AppendNull().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(a_builder->Append(203).ok());
+        EXPECT_TRUE(b_builder->AppendNull().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(a_builder->Append(204).ok());
+        EXPECT_TRUE(b_builder->Append("nsd").ok());
+        EXPECT_TRUE(builder.AppendNull().ok());
+        return finish_array(&builder);
+    }
+
     std::shared_ptr<arrow::Array> build_required_int_list_array() {
         auto value_builder = std::make_shared<arrow::Int32Builder>();
         arrow::ListBuilder builder(arrow::default_memory_pool(), 
value_builder);
@@ -480,6 +507,41 @@ protected:
                       EXPECT_EQ(b_values.get_data_at(1).to_string(), "sb");
                       EXPECT_EQ(b_values.get_data_at(4).to_string(), "se");
                   });
+        add_field(arrow::field("nullable_struct_col",
+                               arrow::struct_({
+                                       arrow::field("a", arrow::int32(), 
false),
+                                       arrow::field("b", arrow::utf8(), true),
+                               }),
+                               true),
+                  build_nullable_struct_array(),
+                  [](const ParquetColumnSchema& schema, const IColumn& column) 
{
+                      EXPECT_TRUE(schema.type->is_nullable());
+                      const auto& nullable_column = assert_cast<const 
ColumnNullable&>(column);
+                      ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+                      EXPECT_FALSE(nullable_column.is_null_at(0));
+                      EXPECT_TRUE(nullable_column.is_null_at(1));
+                      EXPECT_FALSE(nullable_column.is_null_at(2));
+                      EXPECT_FALSE(nullable_column.is_null_at(3));
+                      EXPECT_TRUE(nullable_column.is_null_at(4));
+
+                      const auto& struct_column =
+                              assert_cast<const 
ColumnStruct&>(nullable_column.get_nested_column());
+                      ASSERT_EQ(struct_column.get_columns().size(), 2);
+                      const auto& a_values =
+                              assert_cast<const 
ColumnInt32&>(struct_column.get_column(0));
+                      const auto& b_values =
+                              assert_cast<const 
ColumnNullable&>(struct_column.get_column(1));
+                      const auto& b_nested =
+                              assert_cast<const 
ColumnString&>(b_values.get_nested_column());
+                      EXPECT_EQ(a_values.get_element(0), 201);
+                      EXPECT_EQ(a_values.get_element(2), 203);
+                      EXPECT_EQ(a_values.get_element(3), 204);
+                      EXPECT_FALSE(b_values.is_null_at(0));
+                      EXPECT_TRUE(b_values.is_null_at(2));
+                      EXPECT_FALSE(b_values.is_null_at(3));
+                      EXPECT_EQ(b_nested.get_data_at(0).to_string(), "nsa");
+                      EXPECT_EQ(b_nested.get_data_at(3).to_string(), "nsd");
+                  });
         add_field(arrow::field("list_int_col",
                                arrow::list(arrow::field("element", 
arrow::int32(), false)), false),
                   build_required_int_list_array(),
@@ -717,6 +779,7 @@ TEST_F(ParquetColumnReaderTest, 
ReadAllSupportedPhysicalAndLogicalTypes) {
 
 TEST_F(ParquetColumnReaderTest, ReadSupportedComplexTypes) {
     read_and_validate(find_field_idx("struct_col"));
+    read_and_validate(find_field_idx("nullable_struct_col"));
     read_and_validate(find_field_idx("list_int_col"));
     read_and_validate(find_field_idx("nullable_list_int_col"));
     read_and_validate(find_field_idx("required_nullable_list_int_col"));
@@ -798,6 +861,56 @@ TEST_F(ParquetColumnReaderTest, 
ReadProjectedStructChildren) {
     EXPECT_EQ(values.get_data_at(4).to_string(), "se");
 }
 
+TEST_F(ParquetColumnReaderTest, ReadProjectedNullableStructChildren) {
+    const auto field_idx = find_field_idx("nullable_struct_col");
+    ASSERT_LT(field_idx, _fields.size());
+    const auto& struct_schema = *_fields[field_idx];
+    ASSERT_EQ(struct_schema.name, "nullable_struct_col");
+    ASSERT_EQ(struct_schema.children.size(), 2);
+
+    reader::FieldProjection projection;
+    projection.file_column_id = struct_schema.top_level_field_id;
+    projection.file_path = struct_schema.file_path;
+    projection.project_all_children = false;
+    reader::FieldProjection child_projection;
+    child_projection.file_column_id = struct_schema.top_level_field_id;
+    child_projection.file_path = struct_schema.children[1]->file_path;
+    projection.children.push_back(std::move(child_projection));
+
+    ParquetColumnReaderFactory factory(_row_group, 
_file_reader->metadata()->num_columns());
+    std::unique_ptr<ParquetColumnReader> reader;
+    auto st = factory.create(struct_schema, &projection, &reader);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(reader->type()->is_nullable());
+    ASSERT_EQ(remove_nullable(reader->type())->get_primitive_type(), 
TYPE_STRUCT);
+    const auto* projected_type =
+            assert_cast<const 
DataTypeStruct*>(remove_nullable(reader->type()).get());
+    ASSERT_EQ(projected_type->get_elements().size(), 1);
+    EXPECT_EQ(projected_type->get_element_name(0), "b");
+
+    MutableColumnPtr column = reader->type()->create_column();
+    int64_t rows_read = 0;
+    st = reader->read(ROW_COUNT, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, ROW_COUNT);
+    const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+    EXPECT_FALSE(nullable_column.is_null_at(0));
+    EXPECT_TRUE(nullable_column.is_null_at(1));
+    EXPECT_FALSE(nullable_column.is_null_at(2));
+    EXPECT_FALSE(nullable_column.is_null_at(3));
+    EXPECT_TRUE(nullable_column.is_null_at(4));
+    const auto& struct_column =
+            assert_cast<const 
ColumnStruct&>(nullable_column.get_nested_column());
+    ASSERT_EQ(struct_column.get_columns().size(), 1);
+    const auto& values = assert_cast<const 
ColumnNullable&>(struct_column.get_column(0));
+    const auto& nested_values = assert_cast<const 
ColumnString&>(values.get_nested_column());
+    EXPECT_FALSE(values.is_null_at(0));
+    EXPECT_TRUE(values.is_null_at(2));
+    EXPECT_FALSE(values.is_null_at(3));
+    EXPECT_EQ(nested_values.get_data_at(0).to_string(), "nsa");
+    EXPECT_EQ(nested_values.get_data_at(3).to_string(), "nsd");
+}
+
 TEST_F(ParquetColumnReaderTest, ReadListWithOverflowAcrossChunks) {
     const auto field_idx = find_field_idx("nullable_list_int_col");
     auto reader = create_reader(field_idx);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to