This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2f2d48866862ed0f6c996d840114d88cc9a4a065
Author: 苏小刚 <[email protected]>
AuthorDate: Fri Mar 22 10:36:07 2024 +0800

    [opt](parquet) Support  hive struct schema change (#32438)
    
    Followup: #31128
    This optimization allows doris to correctly read struct type data after 
changing the schema from hive.
    
    ## Changing  struct schema  in hive:
    ```sql
    hive> create table struct_test(id int,sf struct<f1: int, f2: string>) 
stored as parquet;
    
    hive> insert into struct_test values
        >           (1, named_struct('f1', 1, 'f2', 's1')),
        >           (2, named_struct('f1', 2, 'f2', 's2')),
        >           (3, named_struct('f1', 3, 'f2', 's3'));
    
    hive> alter table struct_test change sf sf struct<f1:int, f3:string>;
    
    hive> select * from struct_test;
    OK
    1       {"f1":1,"f3":null}
    2       {"f1":2,"f3":null}
    3       {"f1":3,"f3":null}
    Time taken: 5.298 seconds, Fetched: 3 row(s)
    ```
    
    The previous result of doris was:
    ```sql
    mysql> select * from struct_test;
    +------+-----------------------+
    | id   | sf                    |
    +------+-----------------------+
    |    1 | {"f1": 1, "f3": "s1"} |
    |    2 | {"f1": 2, "f3": "s2"} |
    |    3 | {"f1": 3, "f3": "s3"} |
    +------+-----------------------+
    ```
    
    Now the result is same as hive:
    
    ```sql
    mysql> select * from struct_test;
    +------+-----------------------+
    | id   | sf                    |
    +------+-----------------------+
    |    1 | {"f1": 1, "f3": null} |
    |    2 | {"f1": 2, "f3": null} |
    |    3 | {"f1": 3, "f3": null} |
    +------+-----------------------+
    ```
---
 be/src/vec/data_types/data_type_struct.h           |  1 +
 .../exec/format/parquet/vparquet_column_reader.cpp | 59 +++++++++++++++++-----
 .../exec/format/parquet/vparquet_column_reader.h   | 29 ++++++++---
 3 files changed, 68 insertions(+), 21 deletions(-)

diff --git a/be/src/vec/data_types/data_type_struct.h 
b/be/src/vec/data_types/data_type_struct.h
index ad1a42a011d..3638b0d110a 100644
--- a/be/src/vec/data_types/data_type_struct.h
+++ b/be/src/vec/data_types/data_type_struct.h
@@ -113,6 +113,7 @@ public:
 
     const DataTypePtr& get_element(size_t i) const { return elems[i]; }
     const DataTypes& get_elements() const { return elems; }
+    const String& get_element_name(size_t i) const { return names[i]; }
     const Strings& get_element_names() const { return names; }
 
     size_t get_position_by_name(const String& name) const;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f813d14b63b..27b377048fb 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -142,14 +142,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
         RETURN_IF_ERROR(map_reader->init(std::move(key_reader), 
std::move(value_reader), field));
         reader.reset(map_reader.release());
     } else if (field->type.type == TYPE_STRUCT) {
-        std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
+        std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
child_readers;
         child_readers.reserve(field->children.size());
         for (int i = 0; i < field->children.size(); ++i) {
             std::unique_ptr<ParquetColumnReader> child_reader;
             RETURN_IF_ERROR(create(file, &field->children[i], row_group, 
row_ranges, ctz, io_ctx,
                                    child_reader, max_buf_size));
             child_reader->set_nested_column();
-            child_readers.emplace_back(std::move(child_reader));
+            child_readers[field->children[i].name] = std::move(child_reader);
         }
         auto struct_reader = StructColumnReader::create_unique(row_ranges, 
ctz, io_ctx);
         RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
@@ -701,8 +701,9 @@ Status MapColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr& t
     return Status::OK();
 }
 
-Status 
StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>>&& 
child_readers,
-                                FieldSchema* field) {
+Status StructColumnReader::init(
+        std::unordered_map<std::string, 
std::unique_ptr<ParquetColumnReader>>&& child_readers,
+        FieldSchema* field) {
     _field_schema = field;
     _child_readers = std::move(child_readers);
     return Status::OK();
@@ -728,19 +729,33 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     }
 
     auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
-    if (_child_readers.size() != doris_struct.tuple_size()) {
-        return Status::InternalError("Wrong number of struct fields");
-    }
     const DataTypeStruct* doris_struct_type =
             reinterpret_cast<const 
DataTypeStruct*>(remove_nullable(type).get());
-    for (int i = 0; i < doris_struct.tuple_size(); ++i) {
+
+    bool least_one_reader = false;
+    std::vector<size_t> missing_column_idxs {};
+
+    _read_column_names.clear();
+
+    for (size_t i = 0; i < doris_struct.tuple_size(); ++i) {
         ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
-        DataTypePtr& doris_type = 
const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
+        auto& doris_type = 
const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
+        auto& doris_name = 
const_cast<String&>(doris_struct_type->get_element_name(i));
+
+        // remember the missing column index
+        if (_child_readers.find(doris_name) == _child_readers.end()) {
+            missing_column_idxs.push_back(i);
+            continue;
+        }
+
+        _read_column_names.insert(doris_name);
+
         select_vector.reset();
         size_t field_rows = 0;
         bool field_eof = false;
-        if (i == 0) {
-            RETURN_IF_ERROR(_child_readers[i]->read_column_data(
+        if (!least_one_reader) {
+            least_one_reader = true;
+            RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
                     doris_field, doris_type, select_vector, batch_size, 
&field_rows, &field_eof,
                     is_dict_filter));
             *read_rows = field_rows;
@@ -749,7 +764,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             while (field_rows < *read_rows && !field_eof) {
                 size_t loop_rows = 0;
                 select_vector.reset();
-                RETURN_IF_ERROR(_child_readers[i]->read_column_data(
+                RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
                         doris_field, doris_type, select_vector, *read_rows - 
field_rows, &loop_rows,
                         &field_eof, is_dict_filter));
                 field_rows += loop_rows;
@@ -759,9 +774,25 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         }
     }
 
+    if (!least_one_reader) {
+        // TODO: support read struct which columns are all missing
+        return Status::Corruption("Not support read struct '{}' which columns 
are all missing",
+                                  _field_schema->name);
+    }
+
+    // fill missing column with null or default value
+    for (auto idx : missing_column_idxs) {
+        auto& doris_field = doris_struct.get_column_ptr(idx);
+        auto& doris_type = 
const_cast<DataTypePtr&>(doris_struct_type->get_element(idx));
+        DCHECK(doris_type->is_nullable());
+        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+                (*std::move(doris_field)).mutate().get());
+        nullable_column->insert_null_elements(*read_rows);
+    }
+
     if (null_map_ptr != nullptr) {
-        fill_struct_null_map(_field_schema, *null_map_ptr, 
_child_readers[0]->get_rep_level(),
-                             _child_readers[0]->get_def_level());
+        fill_struct_null_map(_field_schema, *null_map_ptr, 
this->get_rep_level(),
+                             this->get_def_level());
     }
 
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index d15d6d5efa1..249e2d94878 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -25,6 +25,7 @@
 #include <list>
 #include <memory>
 #include <ostream>
+#include <unordered_map>
 #include <vector>
 
 #include "io/fs/buffered_reader.h"
@@ -262,24 +263,37 @@ public:
             : ParquetColumnReader(row_ranges, ctz, io_ctx) {}
     ~StructColumnReader() override { close(); }
 
-    Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& 
child_readers,
-                FieldSchema* field);
+    Status init(
+            std::unordered_map<std::string, 
std::unique_ptr<ParquetColumnReader>>&& child_readers,
+            FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
                             bool* eof, bool is_dict_filter) override;
 
     const std::vector<level_t>& get_rep_level() const override {
-        return _child_readers[0]->get_rep_level();
+        if (!_read_column_names.empty()) {
+            // can't use _child_readers[*_read_column_names.begin()]
+            // because the operator[] of std::unordered_map is not const :(
+            return 
_child_readers.find(*_read_column_names.begin())->second->get_rep_level();
+        }
+        return _child_readers.begin()->second->get_rep_level();
     }
+
     const std::vector<level_t>& get_def_level() const override {
-        return _child_readers[0]->get_def_level();
+        if (!_read_column_names.empty()) {
+            return 
_child_readers.find(*_read_column_names.begin())->second->get_def_level();
+        }
+        return _child_readers.begin()->second->get_def_level();
     }
 
     Statistics statistics() override {
         Statistics st;
         for (const auto& reader : _child_readers) {
-            Statistics cst = reader->statistics();
-            st.merge(cst);
+            // make sure the field is read
+            if (_read_column_names.find(reader.first) != 
_read_column_names.end()) {
+                Statistics cst = reader.second->statistics();
+                st.merge(cst);
+            }
         }
         return st;
     }
@@ -287,7 +301,8 @@ public:
     void close() override {}
 
 private:
-    std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
+    std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_child_readers;
+    std::set<std::string> _read_column_names;
 };
 
 }; // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to