This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch vectorized in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit b20b5b7e4310a91c6cb42a8cc1ba4c6850bd3af2 Author: Pxl <[email protected]> AuthorDate: Wed Jan 12 09:57:10 2022 +0800 [Vectorized][Enhancement] fix some bug & improve some code (#7714) --- .../aggregate_function_reader.cpp | 8 +++----- be/src/vec/exec/volap_scan_node.cpp | 2 ++ be/src/vec/olap/block_reader.cpp | 22 ++++++++++++++-------- be/src/vec/olap/block_reader.h | 4 +++- run-be-ut.sh | 2 +- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp index 9a24ac5..3594d51 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp @@ -23,9 +23,8 @@ namespace doris::vectorized { void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory) { // add a suffix to the function name here to distinguish special functions of agg reader auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator, - bool nullable = false) { - factory.register_function(name + agg_reader_suffix, creator, nullable); + const AggregateFunctionCreator& creator) { + factory.register_function(name + agg_reader_suffix, creator, false); }; register_function_reader("sum", create_aggregate_function_sum_reader); @@ -38,8 +37,7 @@ void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory) void register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& factory) { auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator, - bool nullable = false) { + const AggregateFunctionCreator& creator, bool nullable) { factory.register_function(name + agg_reader_suffix, creator, nullable); }; diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index da7a204..b365c1d 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -259,6 +259,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { } _scan_cpu_timer->update(cpu_watch.elapsed_time()); _scanner_wait_worker_timer->update(wait_time); + + std::unique_lock<std::mutex> l(_scan_blocks_lock); _running_thread--; // The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members. diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index ef3ba3a..f7a1388 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -72,6 +72,10 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, } void BlockReader::_init_agg_state() { + if (_eof) { + return; + } + _stored_data_block = _next_row.block->create_same_struct_block(_batch_size); _stored_data_columns = _stored_data_block->mutate_columns(); @@ -260,7 +264,8 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, void BlockReader::_insert_data_normal(MutableColumns& columns) { auto block = _next_row.block; for (auto idx : _normal_columns_idx) { - columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column, _next_row.row_pos); + columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column, + _next_row.row_pos); } } @@ -270,7 +275,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns) { // execute aggregate when have `batch_size` column or some ref invalid soon bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1); - if (_stored_row_ref.size() == _batch_size || is_last) { + if (is_last || _stored_row_ref.size() == _batch_size) { _update_agg_data(columns); } } @@ -301,11 +306,9 @@ void BlockReader::_update_agg_data(MutableColumns& columns) { } void BlockReader::_copy_agg_data() { - phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, int16_t>>> temp_ref_map; - for (int i = 0; i < _stored_row_ref.size(); i++) { auto& ref = _stored_row_ref[i]; - temp_ref_map[ref.block].emplace_back(ref.row_pos, i); + _temp_ref_map[ref.block].emplace_back(ref.row_pos, i); } for (auto idx : _agg_columns_idx) { @@ -314,11 +317,11 @@ void BlockReader::_copy_agg_data() { //string type should replace ordered for (int i = 0; i < _stored_row_ref.size(); i++) { auto& ref = _stored_row_ref[i]; - dst_column->replace_column_data( - *ref.block->get_by_position(idx).column, ref.row_pos, i); + dst_column->replace_column_data(*ref.block->get_by_position(idx).column, + ref.row_pos, i); } } else { - for (auto& it : temp_ref_map) { + for (auto& it : _temp_ref_map) { auto& src_column = *it.first->get_by_position(idx).column; for (auto& pos : it.second) { dst_column->replace_column_data(src_column, pos.first, pos.second); @@ -327,6 +330,9 @@ void BlockReader::_copy_agg_data() { } } + for (auto& it : _temp_ref_map) { + it.second.clear(); + } _stored_row_ref.clear(); } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 9199072..02cb570 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -86,7 +86,7 @@ private: void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true); VCollectIterator _vcollect_iter; - IteratorRowRef _next_row{nullptr, -1, false}; + IteratorRowRef _next_row {nullptr, -1, false}; std::vector<AggregateFunctionPtr> _agg_functions; std::vector<AggregateDataPtr> _agg_places; @@ -107,6 +107,8 @@ private: std::vector<bool> _stored_has_null_tag; std::vector<bool> _stored_has_string_tag; + phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, int16_t>>> _temp_ref_map; + bool _eof = false; OLAPStatus (BlockReader::*_next_block_func)(Block* block, MemPool* mem_pool, diff --git a/run-be-ut.sh b/run-be-ut.sh index 1fce850..7af98f7 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -73,7 +73,7 @@ fi eval set -- "$OPTS" -PARALLEL=$[$(nproc)/4+1] +PARALLEL=$[$(nproc)/8+1] CLEAN= RUN= if [ $# == 1 ] ; then --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
