This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3dd909f7511c8186f7319355ce976c7979f02964 Author: Ashin Gau <[email protected]> AuthorDate: Tue Aug 22 14:47:10 2023 +0800 [fix](parquet) A row of complex type may be stored across more pages (#23277) A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that whether the reader should read the remaining value of the last row in previous page. --- .../exec/format/parquet/vparquet_column_reader.cpp | 63 +++++++++++++++------- .../exec/format/parquet/vparquet_column_reader.h | 2 +- .../data/external_table_p2/tvf/test_tvf_p2.out | 3 ++ .../external_table_p2/tvf/test_tvf_p2.groovy | 7 +++ 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 0a688f6ed3..1da9a4f3c8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -303,11 +303,25 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter); } +/** + * Load the nested column data of complex type. + * A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that + * whether the reader should read the remaining value of the last row in previous page. + */ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof, bool is_dict_filter) { - _rep_levels.resize(0); - _def_levels.resize(0); + size_t* read_rows, bool* eof, bool is_dict_filter, + bool align_rows = false) { + size_t origin_size = 0; + if (align_rows) { + origin_size = _rep_levels.size(); + // just read the remaining values of the last row in previous page, + // so there's no a new row should be read. + batch_size = 0; + } else { + _rep_levels.resize(0); + _def_levels.resize(0); + } size_t parsed_rows = 0; size_t remaining_values = _chunk_reader->remaining_num_values(); bool has_rep_level = _chunk_reader->max_rep_level() > 0; @@ -327,22 +341,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType _rep_levels.emplace_back(rep_level); remaining_values--; } - } else { + } else if (!align_rows) { + // case : required child columns in struct type parsed_rows = std::min(remaining_values, batch_size); remaining_values -= parsed_rows; - _rep_levels.resize(parsed_rows); - for (size_t i = 0; i < parsed_rows; ++i) { - _rep_levels[i] = 0; - } + _rep_levels.resize(parsed_rows, 0); } size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values; - _def_levels.resize(parsed_values); + _def_levels.resize(origin_size + parsed_values); if (has_def_level) { - _chunk_reader->def_level_decoder().get_levels(&_def_levels[0], parsed_values); + _chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], parsed_values); } else { - for (size_t i = 0; i < parsed_values; ++i) { - _def_levels[i] = 0; - } + std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0); } MutableColumnPtr data_column; @@ -360,14 +370,14 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } data_column = doris_column->assume_mutable(); } - size_t has_read = 0; + size_t has_read = origin_size; size_t ancestor_nulls = 0; null_map.emplace_back(0); bool prev_is_null = false; - while (has_read < parsed_values) { + while (has_read < origin_size + parsed_values) { level_t def_level = _def_levels[has_read++]; size_t loop_read = 1; - while (has_read < parsed_values && _def_levels[has_read] == def_level) { + while (has_read < origin_size + parsed_values && _def_levels[has_read] == def_level) { has_read++; loop_read++; } @@ -407,9 +417,24 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType _chunk_reader->skip_values(ancestor_nulls, false); } - *read_rows = parsed_rows; - if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { - *eof = true; + if (!align_rows) { + *read_rows = parsed_rows; + } + if (_chunk_reader->remaining_num_values() == 0) { + if (_chunk_reader->has_next_page()) { + RETURN_IF_ERROR(_chunk_reader->next_page()); + RETURN_IF_ERROR(_chunk_reader->load_page_data()); + select_vector.reset(); + return _read_nested_column(doris_column, type, select_vector, 0, read_rows, eof, + is_dict_filter, true); + } else { + *eof = true; + } + } + if (_rep_levels.size() > 0) { + // make sure the rows of complex type are aligned correctly, + // so the repetition level of first element should be 0, meaning a new row is started. + DCHECK_EQ(_rep_levels[0], 0); } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index df531bf29c..d83e3ef9b5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -189,7 +189,7 @@ private: ColumnSelectVector& select_vector, bool is_dict_filter); Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof, bool is_dict_filter); + size_t* read_rows, bool* eof, bool is_dict_filter, bool align_rows); Status _try_load_dict_page(bool* loaded, bool* has_dict); }; diff --git a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out index 3e34f50764..cb7239f2ff 100644 --- a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out +++ b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out @@ -39,3 +39,6 @@ -- !nested_types_orc -- 20926 20928 20978 23258 20962 23258 23258 +-- !row_cross_pages -- +25001 25001 25001 + diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy index 030e66b372..d9ff3f6044 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy @@ -46,5 +46,12 @@ suite("test_tvf_p2", "p2") { "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc", "format" = "orc", "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + + // a row of complex type may be stored across more pages + qt_row_cross_pages """select count(id), count(m1), count(m2) + from hdfs( + "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet", + "format" = "parquet", + "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
