This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2f2d48866862ed0f6c996d840114d88cc9a4a065 Author: 苏小刚 <[email protected]> AuthorDate: Fri Mar 22 10:36:07 2024 +0800 [opt](parquet) Support hive struct schema change (#32438) Followup: #31128 This optimization allows doris to correctly read struct type data after changing the schema from hive. ## Changing struct schema in hive: ```sql hive> create table struct_test(id int,sf struct<f1: int, f2: string>) stored as parquet; hive> insert into struct_test values > (1, named_struct('f1', 1, 'f2', 's1')), > (2, named_struct('f1', 2, 'f2', 's2')), > (3, named_struct('f1', 3, 'f2', 's3')); hive> alter table struct_test change sf sf struct<f1:int, f3:string>; hive> select * from struct_test; OK 1 {"f1":1,"f3":null} 2 {"f1":2,"f3":null} 3 {"f1":3,"f3":null} Time taken: 5.298 seconds, Fetched: 3 row(s) ``` The previous result of doris was: ```sql mysql> select * from struct_test; +------+-----------------------+ | id | sf | +------+-----------------------+ | 1 | {"f1": 1, "f3": "s1"} | | 2 | {"f1": 2, "f3": "s2"} | | 3 | {"f1": 3, "f3": "s3"} | +------+-----------------------+ ``` Now the result is same as hive: ```sql mysql> select * from struct_test; +------+-----------------------+ | id | sf | +------+-----------------------+ | 1 | {"f1": 1, "f3": null} | | 2 | {"f1": 2, "f3": null} | | 3 | {"f1": 3, "f3": null} | +------+-----------------------+ ``` --- be/src/vec/data_types/data_type_struct.h | 1 + .../exec/format/parquet/vparquet_column_reader.cpp | 59 +++++++++++++++++----- .../exec/format/parquet/vparquet_column_reader.h | 29 ++++++++--- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/be/src/vec/data_types/data_type_struct.h b/be/src/vec/data_types/data_type_struct.h index ad1a42a011d..3638b0d110a 100644 --- a/be/src/vec/data_types/data_type_struct.h +++ b/be/src/vec/data_types/data_type_struct.h @@ -113,6 +113,7 @@ public: const DataTypePtr& get_element(size_t i) const { return elems[i]; } const DataTypes& get_elements() const { return elems; } + const String& get_element_name(size_t i) const { return names[i]; } const Strings& get_element_names() const { return names; } size_t get_position_by_name(const String& name) const; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index f813d14b63b..27b377048fb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -142,14 +142,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field)); reader.reset(map_reader.release()); } else if (field->type.type == TYPE_STRUCT) { - std::vector<std::unique_ptr<ParquetColumnReader>> child_readers; + std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> child_readers; child_readers.reserve(field->children.size()); for (int i = 0; i < field->children.size(); ++i) { std::unique_ptr<ParquetColumnReader> child_reader; RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size)); child_reader->set_nested_column(); - child_readers.emplace_back(std::move(child_reader)); + child_readers[field->children[i].name] = std::move(child_reader); } auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx); RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field)); @@ -701,8 +701,9 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t return Status::OK(); } -Status StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers, - FieldSchema* field) { +Status StructColumnReader::init( + std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, + FieldSchema* field) { _field_schema = field; _child_readers = std::move(child_readers); return Status::OK(); @@ -728,19 +729,33 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } auto& doris_struct = static_cast<ColumnStruct&>(*data_column); - if (_child_readers.size() != doris_struct.tuple_size()) { - return Status::InternalError("Wrong number of struct fields"); - } const DataTypeStruct* doris_struct_type = reinterpret_cast<const DataTypeStruct*>(remove_nullable(type).get()); - for (int i = 0; i < doris_struct.tuple_size(); ++i) { + + bool least_one_reader = false; + std::vector<size_t> missing_column_idxs {}; + + _read_column_names.clear(); + + for (size_t i = 0; i < doris_struct.tuple_size(); ++i) { ColumnPtr& doris_field = doris_struct.get_column_ptr(i); - DataTypePtr& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i)); + auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i)); + auto& doris_name = const_cast<String&>(doris_struct_type->get_element_name(i)); + + // remember the missing column index + if (_child_readers.find(doris_name) == _child_readers.end()) { + missing_column_idxs.push_back(i); + continue; + } + + _read_column_names.insert(doris_name); + select_vector.reset(); size_t field_rows = 0; bool field_eof = false; - if (i == 0) { - RETURN_IF_ERROR(_child_readers[i]->read_column_data( + if (!least_one_reader) { + least_one_reader = true; + RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( doris_field, doris_type, select_vector, batch_size, &field_rows, &field_eof, is_dict_filter)); *read_rows = field_rows; @@ -749,7 +764,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr while (field_rows < *read_rows && !field_eof) { size_t loop_rows = 0; select_vector.reset(); - RETURN_IF_ERROR(_child_readers[i]->read_column_data( + RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( doris_field, doris_type, select_vector, *read_rows - field_rows, &loop_rows, &field_eof, is_dict_filter)); field_rows += loop_rows; @@ -759,9 +774,25 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } } + if (!least_one_reader) { + // TODO: support read struct which columns are all missing + return Status::Corruption("Not support read struct '{}' which columns are all missing", + _field_schema->name); + } + + // fill missing column with null or default value + for (auto idx : missing_column_idxs) { + auto& doris_field = doris_struct.get_column_ptr(idx); + auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(idx)); + DCHECK(doris_type->is_nullable()); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(doris_field)).mutate().get()); + nullable_column->insert_null_elements(*read_rows); + } + if (null_map_ptr != nullptr) { - fill_struct_null_map(_field_schema, *null_map_ptr, _child_readers[0]->get_rep_level(), - _child_readers[0]->get_def_level()); + fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(), + this->get_def_level()); } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index d15d6d5efa1..249e2d94878 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -25,6 +25,7 @@ #include <list> #include <memory> #include <ostream> +#include <unordered_map> #include <vector> #include "io/fs/buffered_reader.h" @@ -262,24 +263,37 @@ public: : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~StructColumnReader() override { close(); } - Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers, - FieldSchema* field); + Status init( + std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, + FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { - return _child_readers[0]->get_rep_level(); + if (!_read_column_names.empty()) { + // can't use _child_readers[*_read_column_names.begin()] + // because the operator[] of std::unordered_map is not const :( + return _child_readers.find(*_read_column_names.begin())->second->get_rep_level(); + } + return _child_readers.begin()->second->get_rep_level(); } + const std::vector<level_t>& get_def_level() const override { - return _child_readers[0]->get_def_level(); + if (!_read_column_names.empty()) { + return _child_readers.find(*_read_column_names.begin())->second->get_def_level(); + } + return _child_readers.begin()->second->get_def_level(); } Statistics statistics() override { Statistics st; for (const auto& reader : _child_readers) { - Statistics cst = reader->statistics(); - st.merge(cst); + // make sure the field is read + if (_read_column_names.find(reader.first) != _read_column_names.end()) { + Statistics cst = reader.second->statistics(); + st.merge(cst); + } } return st; } @@ -287,7 +301,8 @@ public: void close() override {} private: - std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers; + std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; + std::set<std::string> _read_column_names; }; }; // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
