This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 3dd722bc4ca2850a34e9df691ff666e8ec500688 Author: SleepyBear <[email protected]> AuthorDate: Mon Apr 25 10:01:42 2022 +0800 [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173) --- be/src/exec/base_scanner.cpp | 13 ++++++++++++- be/src/exec/base_scanner.h | 6 +++++- be/src/exec/broker_scanner.cpp | 12 +++--------- be/src/exec/broker_scanner.h | 2 +- be/src/exec/json_scanner.cpp | 3 +-- be/src/exec/orc_scanner.cpp | 3 +-- be/src/exec/parquet_scanner.cpp | 3 +-- 7 files changed, 24 insertions(+), 18 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index ab004b17f0..cd9bf8685e 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } -Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { +Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) { + RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool)); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } + return Status::OK(); +} + +Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { // filter src tuple by preceding filter first if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) { _counter->num_rows_unselected++; diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index a745e938d2..21abf080f9 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -58,12 +58,13 @@ public: // Close this scanner virtual void close() = 0; - Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); + Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); void fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path); void free_expr_local_allocations(); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; @@ -106,6 +107,9 @@ protected: // Used to record whether a row of data is successfully read. bool _success = false; bool _scanner_eof = false; + +private: + Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); }; } /* namespace doris */ diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 96e9e315c1..00c31d3c1f 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } + RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple)); break; // break always } } @@ -461,14 +455,14 @@ bool is_null(const Slice& slice) { } // Convert one row to this tuple -Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) { +Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) { RETURN_IF_ERROR(_line_to_src_tuple(line)); if (!_success) { // If not success, which means we met an invalid row, return. return Status::OK(); } - return fill_dest_tuple(tuple, tuple_pool); + return fill_dest_tuple(tuple, tuple_pool, fill_tuple); } // Convert one row to this tuple diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index d6deb7d64f..ddd8bd9c78 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -87,7 +87,7 @@ private: // Convert one row to one tuple // 'ptr' and 'len' is csv text line // output is tuple - Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool); + Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple); Status _line_to_src_tuple(const Slice& line); diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 67a58c4216..d651545806 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -96,8 +96,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) { diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 0c69945271..8537189348 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -357,8 +357,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; } if (_scanner_eof) { diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 9a85a1253a..3295dc4bc7 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
