This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 473359772c229d2591b2059837e5dc31c6899fe0 Author: Ashin Gau <[email protected]> AuthorDate: Tue Aug 22 13:35:29 2023 +0800 [fix](parquet) parquet reader confuses logical/physical/slot id of columns (#23198) `ParquetReader` confuses logical/physical/slot id of columns. If only reading the scalar types, there's nothing wrong, but when reading complex types, `RowGroup` and `PageIndex` will get wrong statistics. Therefore, if the query contains complex types and pushed-down predicates, the probability of the result set is incorrect. --- be/src/vec/exec/format/parquet/parquet_common.h | 8 -- .../exec/format/parquet/vparquet_column_reader.h | 2 - .../exec/format/parquet/vparquet_group_reader.cpp | 11 +-- .../exec/format/parquet/vparquet_group_reader.h | 8 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 97 +++++++++++----------- be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 4 +- .../hive/test_hive_text_complex_type.out | 3 + .../hive/test_hive_text_complex_type.groovy | 6 +- 9 files changed, 68 insertions(+), 77 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 7c35a2b111..0a4278ae67 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -46,14 +46,6 @@ struct RowRange { } }; -struct ParquetReadColumn { - ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name) - : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {}; - - int _parquet_col_id; - const std::string& _file_slot_name; -}; - #pragma pack(1) struct ParquetInt96 { uint64_t lo; // time of nanoseconds in a day diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 6d0e2e3f6e..df531bf29c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -132,7 +132,6 @@ public: const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size); - void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } void set_nested_column() { _nested_column = true; } virtual const std::vector<level_t>& get_rep_level() const = 0; virtual const std::vector<level_t>& get_def_level() const = 0; @@ -149,7 +148,6 @@ protected: const std::vector<RowRange>& _row_ranges; cctz::time_zone* _ctz; io::IOContext* _io_ctx; - tparquet::OffsetIndex* _offset_index; int64_t _current_row_index = 0; int _row_range_index = 0; int64_t _decode_null_map_time = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 3ef3581e36..d3aa4c3cad 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -78,7 +78,7 @@ namespace doris::vectorized { const std::vector<int64_t> RowGroupReader::NO_DELETE = {}; RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, - const std::vector<ParquetReadColumn>& read_columns, + const std::vector<std::string>& read_columns, const int32_t row_group_id, const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx, const PositionDeleteContext& position_delete_ctx, @@ -126,21 +126,16 @@ Status RowGroupReader::init( const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size()); for (auto& read_col : _read_columns) { - auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name)); + auto field = const_cast<FieldSchema*>(schema.get_column(read_col)); std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader, max_buf_size)); - auto col_iter = col_offsets.find(read_col._parquet_col_id); - if (col_iter != col_offsets.end()) { - tparquet::OffsetIndex oi = col_iter->second; - reader->add_offset_index(&oi); - } if (reader == nullptr) { VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; return Status::Corruption("Init row group reader failed"); } - _column_readers[read_col._file_slot_name] = std::move(reader); + _column_readers[read_col] = std::move(reader); } // Check if single slot can be filtered by dict. if (!_slot_id_to_filter_conjuncts) { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index baa5912f99..c44899b583 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -141,9 +141,9 @@ public: PositionDeleteContext(const PositionDeleteContext& filter) = default; }; - RowGroupReader(io::FileReaderSPtr file_reader, - const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, - const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx, + RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<std::string>& read_columns, + const int32_t row_group_id, const tparquet::RowGroup& row_group, + cctz::time_zone* ctz, io::IOContext* io_ctx, const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx, RuntimeState* state); @@ -191,7 +191,7 @@ private: io::FileReaderSPtr _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; - const std::vector<ParquetReadColumn>& _read_columns; + const std::vector<std::string>& _read_columns; const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; int64_t _remaining_rows; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index ba80992a04..f8dab141b7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -322,6 +322,10 @@ Status ParquetReader::init_reader( // e.g. table added a column after this parquet file was written. _column_names = &all_column_names; auto schema_desc = _file_metadata->schema(); + std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end()); + // Currently only used in iceberg, the columns are dropped but added back + std::set<std::string> dropped_columns(missing_column_names.begin(), missing_column_names.end()); + // Make the order of read columns the same as physical order in parquet file for (int i = 0; i < schema_desc.size(); ++i) { auto name = schema_desc.get_column(i)->name; // If the column in parquet file is included in all_column_names and not in missing_column_names, @@ -329,15 +333,17 @@ Status ParquetReader::init_reader( // Here to check against missing_column_names is for the 'Add a column back to the table // with the same column name' case. (drop column a then add column a). // Shouldn't read this column data in this case. - if (find(all_column_names.begin(), all_column_names.end(), name) != - all_column_names.end() && - find(missing_column_names.begin(), missing_column_names.end(), name) == - missing_column_names.end()) { - _map_column.emplace(name, i); + if (required_columns.find(name) != required_columns.end() && + dropped_columns.find(name) == dropped_columns.end()) { + required_columns.erase(name); + _read_columns.emplace_back(name); } } + for (const std::string& name : required_columns) { + _missing_cols.emplace_back(name); + } + _colname_to_value_range = colname_to_value_range; - RETURN_IF_ERROR(_init_read_columns()); // build column predicates for column lazy read _lazy_read_ctx.conjuncts = conjuncts; RETURN_IF_ERROR(_init_row_groups(filter_groups)); @@ -394,15 +400,15 @@ Status ParquetReader::set_fill_columns( const FieldDescriptor& schema = _file_metadata->schema(); for (auto& read_col : _read_columns) { - _lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name); - PrimitiveType column_type = schema.get_column(read_col._file_slot_name)->type.type; + _lazy_read_ctx.all_read_columns.emplace_back(read_col); + PrimitiveType column_type = schema.get_column(read_col)->type.type; if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || column_type == TYPE_STRUCT) { _has_complex_type = true; } if (predicate_columns.size() > 0) { - auto iter = predicate_columns.find(read_col._file_slot_name); + auto iter = predicate_columns.find(read_col); if (iter == predicate_columns.end()) { - _lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name); + _lazy_read_ctx.lazy_read_columns.emplace_back(read_col); } else { _lazy_read_ctx.predicate_columns.first.emplace_back(iter->first); _lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second); @@ -450,29 +456,6 @@ Status ParquetReader::set_fill_columns( return Status::OK(); } -Status ParquetReader::_init_read_columns() { - std::vector<int> include_column_ids; - for (auto& file_col_name : *_column_names) { - auto iter = _map_column.find(file_col_name); - if (iter != _map_column.end()) { - include_column_ids.emplace_back(iter->second); - } else { - _missing_cols.push_back(file_col_name); - } - } - // It is legal to get empty include_column_ids in query task. - if (include_column_ids.empty()) { - return Status::OK(); - } - // The same order as physical columns - std::sort(include_column_ids.begin(), include_column_ids.end()); - for (int& parquet_col_id : include_column_ids) { - _read_columns.emplace_back(parquet_col_id, - _file_metadata->schema().get_column(parquet_col_id)->name); - } - return Status::OK(); -} - std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type() { std::unordered_map<std::string, TypeDescriptor> map; const auto& schema_desc = _file_metadata->schema(); @@ -643,11 +626,24 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) { RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group)); } int64_t group_size = 0; // only calculate the needed columns - for (auto& read_col : _read_columns) { - auto& parquet_col_id = read_col._parquet_col_id; - if (row_group.columns[parquet_col_id].__isset.meta_data) { - group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size; + std::function<int64_t(const FieldSchema*)> column_compressed_size = + [&row_group, &column_compressed_size](const FieldSchema* field) -> int64_t { + if (field->physical_column_index >= 0) { + int parquet_col_id = field->physical_column_index; + if (row_group.columns[parquet_col_id].__isset.meta_data) { + return row_group.columns[parquet_col_id].meta_data.total_compressed_size; + } + return 0; + } + int64_t size = 0; + for (const FieldSchema& child : field->children) { + size += column_compressed_size(&child); } + return size; + }; + for (auto& read_col : _read_columns) { + const FieldSchema* field = _file_metadata->schema().get_column(read_col); + group_size += column_compressed_size(field); } if (!filter_group) { _read_row_groups.emplace_back(row_group_idx, row_index, row_index + row_group.num_rows); @@ -703,7 +699,7 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges( }; const tparquet::RowGroup& row_group = _t_metadata->row_groups[group.row_group_id]; for (const auto& read_col : _read_columns) { - const FieldSchema* field = _file_metadata->schema().get_column(read_col._file_slot_name); + const FieldSchema* field = _file_metadata->schema().get_column(read_col); scalar_range(field, row_group); } if (!result.empty()) { @@ -766,11 +762,16 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, // read twice: parse column index & parse offset index _column_statistics.meta_read_calls += 2; for (auto& read_col : _read_columns) { - auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name); + auto conjunct_iter = _colname_to_value_range->find(read_col); if (_colname_to_value_range->end() == conjunct_iter) { continue; } - auto& chunk = row_group.columns[read_col._parquet_col_id]; + int parquet_col_id = _file_metadata->schema().get_column(read_col)->physical_column_index; + if (parquet_col_id < 0) { + // complex type, not support page index yet. + continue; + } + auto& chunk = row_group.columns[parquet_col_id]; if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) { continue; } @@ -782,7 +783,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, } auto& conjuncts = conjunct_iter->second; std::vector<int> skipped_page_range; - const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name); + const FieldSchema* col_schema = schema_desc.get_column(read_col); page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema, skipped_page_range, *_ctz); if (skipped_page_range.empty()) { @@ -797,7 +798,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, // use the union row range skipped_row_ranges.emplace_back(skipped_row_range); } - _col_offsets.emplace(read_col._parquet_col_id, offset_index); + _col_offsets.emplace(parquet_col_id, offset_index); } if (skipped_row_ranges.empty()) { read_whole_row_group(); @@ -849,16 +850,16 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co return Status::OK(); } auto& schema_desc = _file_metadata->schema(); - for (auto& col_name : *_column_names) { - auto col_iter = _map_column.find(col_name); - if (col_iter == _map_column.end()) { - continue; - } + for (auto& col_name : _read_columns) { auto slot_iter = _colname_to_value_range->find(col_name); if (slot_iter == _colname_to_value_range->end()) { continue; } - int parquet_col_id = col_iter->second; + int parquet_col_id = _file_metadata->schema().get_column(col_name)->physical_column_index; + if (parquet_col_id < 0) { + // complex type, not support filter yet. + continue; + } auto& meta_data = columns[parquet_col_id].meta_data; auto& statistic = meta_data.statistics; bool is_all_null = diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 0f3996db40..b2f8a2bd19 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -187,7 +187,6 @@ private: RowGroupReader::PositionDeleteContext _get_position_delete_ctx( const tparquet::RowGroup& row_group, const RowGroupReader::RowGroupIndex& row_group_index); - Status _init_read_columns(); Status _init_row_groups(const bool& is_filter_groups); void _init_system_properties(); void _init_file_description(); @@ -226,12 +225,11 @@ private: std::unique_ptr<RowGroupReader> _current_group_reader = nullptr; // read to the end of current reader bool _row_group_eof = true; - int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file - std::map<std::string, int> _map_column; // column-name <---> column-index + int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file // table column name to file column name map. For iceberg schema evolution. std::unordered_map<std::string, std::string> _table_col_to_file_col; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; - std::vector<ParquetReadColumn> _read_columns; + std::vector<std::string> _read_columns; RowRange _whole_range = RowRange(0, 0); const std::vector<int64_t>* _delete_rows = nullptr; int64_t _delete_rows_index = 0; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index bb77b32e58..08879e604a 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -499,11 +499,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) { SlotDescriptor string_slot(tslot_desc); tuple_slots.emplace_back(&string_slot); - std::vector<ParquetReadColumn> read_columns; + std::vector<std::string> read_columns; RowGroupReader::LazyReadContext lazy_read_ctx; for (const auto& slot : tuple_slots) { lazy_read_ctx.all_read_columns.emplace_back(slot->col_name()); - read_columns.emplace_back(ParquetReadColumn(7, slot->col_name())); + read_columns.emplace_back(slot->col_name()); } io::FileSystemSPtr local_fs = io::LocalFileSystem::create(""); io::FileReaderSPtr file_reader; diff --git a/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out b/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out index a04b9c1def..d1113f5228 100644 --- a/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out +++ b/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out @@ -13,3 +13,6 @@ 10 {101:1, 102:1, 103:1} {102:10, 104:1, 105:2} {"field1":100, "field0":100} {"field2":3000000} {"field3":300000000} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28} 11 {101:1, 102:1, 13:1, 12:1} {102:10, 14:1, 15:2, 12:10} {"field1":100, "fie88ld0":100, "fieweld0":100, "fieeeld1":100, "fieeeld0":100, "feeield0":100, "feeield1":100, "firreld0":100, "field0":100} {"field2":3000000, "abcd":4000000, "1231":3000000} {"fi7eld3":300000000, "field30":300000000, "fielwwd3":300000000, "fi055":300000000, "field7":300000121323} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello", 0:"hello"} {"field6":2023-07-28 12:34:56.000000, " [...] +-- !filter_complex -- +50000 50000 50000 + diff --git a/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy b/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy index 8ea9f74135..2b0098081c 100644 --- a/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy +++ b/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy @@ -40,7 +40,11 @@ suite("test_hive_text_complex_type", "p2,external,hive,external_remote,external_ qt_sql2 """ select * from hive_text_complex_type_delimiter order by column1; """ - + qt_filter_complex """select count(column_primitive_integer), + count(column1_struct), + count(column_primitive_bigint) + from parquet_predicate_table where column_primitive_bigint = 6;""" + sql """drop catalog ${catalog_name};""" } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
