This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch vectorized
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b20b5b7e4310a91c6cb42a8cc1ba4c6850bd3af2
Author: Pxl <[email protected]>
AuthorDate: Wed Jan 12 09:57:10 2022 +0800

    [Vectorized][Enhancement] fix some bug & improve some code (#7714)
---
 .../aggregate_function_reader.cpp                  |  8 +++-----
 be/src/vec/exec/volap_scan_node.cpp                |  2 ++
 be/src/vec/olap/block_reader.cpp                   | 22 ++++++++++++++--------
 be/src/vec/olap/block_reader.h                     |  4 +++-
 run-be-ut.sh                                       |  2 +-
 5 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
index 9a24ac5..3594d51 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
@@ -23,9 +23,8 @@ namespace doris::vectorized {
 void register_aggregate_function_reader(AggregateFunctionSimpleFactory& 
factory) {
     // add a suffix to the function name here to distinguish special functions 
of agg reader
     auto register_function_reader = [&](const std::string& name,
-                                        const AggregateFunctionCreator& 
creator,
-                                        bool nullable = false) {
-        factory.register_function(name + agg_reader_suffix, creator, nullable);
+                                        const AggregateFunctionCreator& 
creator) {
+        factory.register_function(name + agg_reader_suffix, creator, false);
     };
 
     register_function_reader("sum", create_aggregate_function_sum_reader);
@@ -38,8 +37,7 @@ void 
register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory)
 
 void 
register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& 
factory) {
     auto register_function_reader = [&](const std::string& name,
-                                        const AggregateFunctionCreator& 
creator,
-                                        bool nullable = false) {
+                                        const AggregateFunctionCreator& 
creator, bool nullable) {
         factory.register_function(name + agg_reader_suffix, creator, nullable);
     };
 
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index da7a204..b365c1d 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -259,6 +259,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     }
     _scan_cpu_timer->update(cpu_watch.elapsed_time());
     _scanner_wait_worker_timer->update(wait_time);
+
+    std::unique_lock<std::mutex> l(_scan_blocks_lock);
     _running_thread--;
 
     // The transfer thead will wait for `_running_thread==0`, to make sure all 
scanner threads won't access class members.
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index ef3ba3a..f7a1388 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -72,6 +72,10 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
 }
 
 void BlockReader::_init_agg_state() {
+    if (_eof) {
+        return;
+    }
+
     _stored_data_block = 
_next_row.block->create_same_struct_block(_batch_size);
     _stored_data_columns = _stored_data_block->mutate_columns();
 
@@ -260,7 +264,8 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* 
block, MemPool* mem_pool,
 void BlockReader::_insert_data_normal(MutableColumns& columns) {
     auto block = _next_row.block;
     for (auto idx : _normal_columns_idx) {
-        
columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
 _next_row.row_pos);
+        
columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
+                                                       _next_row.row_pos);
     }
 }
 
@@ -270,7 +275,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns) 
{
 
     // execute aggregate when have `batch_size` column or some ref invalid soon
     bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
-    if (_stored_row_ref.size() == _batch_size || is_last) {
+    if (is_last || _stored_row_ref.size() == _batch_size) {
         _update_agg_data(columns);
     }
 }
@@ -301,11 +306,9 @@ void BlockReader::_update_agg_data(MutableColumns& 
columns) {
 }
 
 void BlockReader::_copy_agg_data() {
-    phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, 
int16_t>>> temp_ref_map;
-
     for (int i = 0; i < _stored_row_ref.size(); i++) {
         auto& ref = _stored_row_ref[i];
-        temp_ref_map[ref.block].emplace_back(ref.row_pos, i);
+        _temp_ref_map[ref.block].emplace_back(ref.row_pos, i);
     }
 
     for (auto idx : _agg_columns_idx) {
@@ -314,11 +317,11 @@ void BlockReader::_copy_agg_data() {
             //string type should replace ordered
             for (int i = 0; i < _stored_row_ref.size(); i++) {
                 auto& ref = _stored_row_ref[i];
-                dst_column->replace_column_data(
-                        *ref.block->get_by_position(idx).column, ref.row_pos, 
i);
+                
dst_column->replace_column_data(*ref.block->get_by_position(idx).column,
+                                                ref.row_pos, i);
             }
         } else {
-            for (auto& it : temp_ref_map) {
+            for (auto& it : _temp_ref_map) {
                 auto& src_column = *it.first->get_by_position(idx).column;
                 for (auto& pos : it.second) {
                     dst_column->replace_column_data(src_column, pos.first, 
pos.second);
@@ -327,6 +330,9 @@ void BlockReader::_copy_agg_data() {
         }
     }
 
+    for (auto& it : _temp_ref_map) {
+        it.second.clear();
+    }
     _stored_row_ref.clear();
 }
 
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 9199072..02cb570 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -86,7 +86,7 @@ private:
     void _update_agg_value(MutableColumns& columns, int begin, int end, bool 
is_close = true);
 
     VCollectIterator _vcollect_iter;
-    IteratorRowRef _next_row{nullptr, -1, false};
+    IteratorRowRef _next_row {nullptr, -1, false};
 
     std::vector<AggregateFunctionPtr> _agg_functions;
     std::vector<AggregateDataPtr> _agg_places;
@@ -107,6 +107,8 @@ private:
     std::vector<bool> _stored_has_null_tag;
     std::vector<bool> _stored_has_string_tag;
 
+    phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, 
int16_t>>> _temp_ref_map;
+
     bool _eof = false;
 
     OLAPStatus (BlockReader::*_next_block_func)(Block* block, MemPool* 
mem_pool,
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 1fce850..7af98f7 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -73,7 +73,7 @@ fi
 
 eval set -- "$OPTS"
 
-PARALLEL=$[$(nproc)/4+1]
+PARALLEL=$[$(nproc)/8+1]
 CLEAN=
 RUN=
 if [ $# == 1 ] ; then

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to