This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new abc802b5ba9 [bugfix](core) child block is shared between operator and 
node, it should be shared ptr (#28106)
abc802b5ba9 is described below

commit abc802b5ba93bf9dc868ce5996ab591bf3b64dcd
Author: yiguolei <[email protected]>
AuthorDate: Sat Dec 9 00:18:14 2023 +0800

    [bugfix](core) child block is shared between operator and node, it should 
be shared ptr (#28106)
    
    _child_block in nest loop join , table value function, repeat node will be 
shared between ExecNode and related operator, but it should not be a unique ptr 
in operator, it belongs to exec node.
    
    It will double free the block, if operator's close method is not called 
correctly.
    
    It should be a shared ptr, then it will not core even if the opeartor's 
close method is not called.
---
 .../exec/nested_loop_join_probe_operator.cpp       |  3 +--
 be/src/pipeline/exec/operator.h                    |  4 +--
 be/src/pipeline/exec/repeat_operator.cpp           |  3 +--
 be/src/pipeline/exec/table_function_operator.cpp   |  3 +--
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 30 ++++++++++++----------
 be/src/vec/exec/join/vnested_loop_join_node.h      | 10 ++++----
 be/src/vec/exec/vrepeat_node.cpp                   | 14 +++++-----
 be/src/vec/exec/vrepeat_node.h                     |  4 +--
 be/src/vec/exec/vtable_function_node.cpp           | 14 +++++-----
 be/src/vec/exec/vtable_function_node.h             |  8 +++---
 10 files changed, 48 insertions(+), 45 deletions(-)

diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index a3d24aa0614..b7477d0b4f8 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator, 
StatefulOperator)
 
 Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) {
     // just for speed up, the way is dangerous
-    _child_block.reset(_node->get_left_block());
+    _child_block = _node->get_left_block();
     return StatefulOperator::prepare(state);
 }
 
 Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) {
-    _child_block.release();
     return StatefulOperator::close(state);
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 2beea932a8b..7ff91fba695 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -444,7 +444,7 @@ public:
 
     StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
             : StreamingOperator<OperatorBuilderType>(builder, node),
-              _child_block(vectorized::Block::create_unique()),
+              _child_block(vectorized::Block::create_shared()),
               _child_source_state(SourceState::DEPEND_ON_SOURCE) {}
 
     virtual ~StatefulOperator() = default;
@@ -484,7 +484,7 @@ public:
     }
 
 protected:
-    std::unique_ptr<vectorized::Block> _child_block;
+    std::shared_ptr<vectorized::Block> _child_block;
     SourceState _child_source_state;
 };
 
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 40f6f5d7b21..d1613a6125f 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
 
 Status RepeatOperator::prepare(doris::RuntimeState* state) {
     // just for speed up, the way is dangerous
-    _child_block.reset(_node->get_child_block());
+    _child_block = _node->get_child_block();
     return StatefulOperator::prepare(state);
 }
 
 Status RepeatOperator::close(doris::RuntimeState* state) {
-    _child_block.release();
     return StatefulOperator::close(state);
 }
 
diff --git a/be/src/pipeline/exec/table_function_operator.cpp 
b/be/src/pipeline/exec/table_function_operator.cpp
index 5e1d5c281eb..6e947c640c9 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator, 
StatefulOperator)
 
 Status TableFunctionOperator::prepare(doris::RuntimeState* state) {
     // just for speed up, the way is dangerous
-    _child_block.reset(_node->get_child_block());
+    _child_block = _node->get_child_block();
     return StatefulOperator::prepare(state);
 }
 
 Status TableFunctionOperator::close(doris::RuntimeState* state) {
-    _child_block.release();
     return StatefulOperator::close(state);
 }
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index f3214639856..7d8100aa6ca 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, 
const TPlanNode& tnod
           _left_block_pos(0),
           _left_side_eos(false),
           _old_version_flag(!tnode.__isset.nested_loop_join_node),
-          _runtime_filter_descs(tnode.runtime_filters) {}
+          _runtime_filter_descs(tnode.runtime_filters) {
+    _left_block = Block::create_shared();
+}
 
 Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
@@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* 
state, vectorized::Block*
 
 Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) {
     do {
-        release_block_memory(_left_block);
+        release_block_memory(*_left_block);
         RETURN_IF_ERROR(child(0)->get_next_after_projects(
-                state, &_left_block, &_left_side_eos,
+                state, _left_block.get(), &_left_side_eos,
                 std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
                                   ExecNode::get_next,
                           _children[0], std::placeholders::_1, 
std::placeholders::_2,
                           std::placeholders::_3)));
 
-    } while (_left_block.rows() == 0 && !_left_side_eos);
+    } while (_left_block->rows() == 0 && !_left_side_eos);
 
     return Status::OK();
 }
@@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     RETURN_IF_CANCELLED(state);
     while (need_more_input_data()) {
         RETURN_IF_ERROR(_fresh_left_block(state));
-        RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
+        RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos));
     }
 
     return pull(state, block, eos);
@@ -280,7 +282,7 @@ void 
VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
     auto& dst_columns = mutable_block.mutable_columns();
     DCHECK(_is_mark_join);
     for (size_t i = 0; i < _num_probe_side_columns; ++i) {
-        const ColumnWithTypeAndName& src_column = 
_left_block.get_by_position(i);
+        const ColumnWithTypeAndName& src_column = 
_left_block->get_by_position(i);
         if (!src_column.column->is_nullable() && 
dst_columns[i]->is_nullable()) {
             auto origin_sz = dst_columns[i]->size();
             DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN);
@@ -310,7 +312,7 @@ void 
VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
     auto& dst_columns = mutable_block.mutable_columns();
     const int max_added_rows = now_process_build_block.rows();
     for (size_t i = 0; i < _num_probe_side_columns; ++i) {
-        const ColumnWithTypeAndName& src_column = 
_left_block.get_by_position(i);
+        const ColumnWithTypeAndName& src_column = 
_left_block->get_by_position(i);
         if (!src_column.column->is_nullable() && 
dst_columns[i]->is_nullable()) {
             auto origin_sz = dst_columns[i]->size();
             DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN);
@@ -456,13 +458,13 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
     } else {
         if (!_is_mark_join) {
             auto new_size = column_size;
-            DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_left_block.rows());
+            DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_left_block->rows());
             for (int j = _left_block_start_pos;
                  j < _left_block_start_pos + _left_side_process_count; ++j) {
                 if (_cur_probe_row_visited_flags[j] == IsSemi) {
                     new_size++;
                     for (size_t i = 0; i < _num_probe_side_columns; ++i) {
-                        const ColumnWithTypeAndName src_column = 
_left_block.get_by_position(i);
+                        const ColumnWithTypeAndName src_column = 
_left_block->get_by_position(i);
                         if (!src_column.column->is_nullable() && 
dst_columns[i]->is_nullable()) {
                             DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
                             assert_cast<ColumnNullable*>(dst_columns[i].get())
@@ -488,13 +490,13 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
         } else {
             ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 
1]);
             mark_column.reserve(mark_column.size() + _left_side_process_count);
-            DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_left_block.rows());
+            DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_left_block->rows());
             for (int j = _left_block_start_pos;
                  j < _left_block_start_pos + _left_side_process_count; ++j) {
                 mark_column.insert_value(IsSemi == 
_cur_probe_row_visited_flags[j]);
             }
             for (size_t i = 0; i < _num_probe_side_columns; ++i) {
-                const ColumnWithTypeAndName src_column = 
_left_block.get_by_position(i);
+                const ColumnWithTypeAndName src_column = 
_left_block->get_by_position(i);
                 DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
                 dst_columns[i]->insert_range_from(*src_column.column, 
_left_block_start_pos,
                                                   _left_side_process_count);
@@ -541,7 +543,7 @@ void 
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
     }
     if constexpr (SetProbeSideFlag) {
         int end = filter.size();
-        for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 
1 : _left_block_pos;
+        for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos 
- 1 : _left_block_pos;
              i >= _left_block_start_pos; i--) {
             int offset = 0;
             if (!_probe_offset_stack.empty()) {
@@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int 
indentation_level, std::stringstream*
 }
 
 void VNestedLoopJoinNode::_release_mem() {
-    _left_block.clear();
+    _left_block->clear();
 
     Blocks tmp_build_blocks;
     _build_blocks.swap(tmp_build_blocks);
@@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, 
vectorized::Block* block,
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_probe_timer);
     if (_is_output_left_side_only) {
-        RETURN_IF_ERROR(_build_output_block(&_left_block, block));
+        RETURN_IF_ERROR(_build_output_block(_left_block.get(), block));
         *eos = _left_side_eos;
         _need_more_input_data = !_left_side_eos;
     } else {
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h 
b/be/src/vec/exec/join/vnested_loop_join_node.h
index b309485db51..810bf57e7f5 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -95,7 +95,7 @@ public:
                        : *_output_row_desc;
     }
 
-    Block* get_left_block() { return &_left_block; }
+    std::shared_ptr<Block> get_left_block() { return _left_block; }
 
     std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return 
_runtime_filter_descs; }
     VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
@@ -120,14 +120,14 @@ private:
             // We should try to join rows if there still are some rows from 
probe side.
             while (_join_block.rows() < state->batch_size()) {
                 while (_current_build_pos == _build_blocks.size() ||
-                       _left_block_pos == _left_block.rows()) {
+                       _left_block_pos == _left_block->rows()) {
                     // if left block is empty(), do not need disprocess the 
left block rows
-                    if (_left_block.rows() > _left_block_pos) {
+                    if (_left_block->rows() > _left_block_pos) {
                         _left_side_process_count++;
                     }
 
                     _reset_with_next_probe_row();
-                    if (_left_block_pos < _left_block.rows()) {
+                    if (_left_block_pos < _left_block->rows()) {
                         if constexpr (set_probe_side_flag) {
                             
_probe_offset_stack.push(mutable_join_block.rows());
                         }
@@ -260,7 +260,7 @@ private:
     // _left_block must be cleared before calling get_next().  The child node
     // does not initialize all tuple ptrs in the row, only the ones that it
     // is responsible for.
-    Block _left_block;
+    std::shared_ptr<Block> _left_block;
 
     int _left_block_start_pos = 0;
     int _left_block_pos; // current scan pos in _left_block
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index aaa7ed17d00..1765c1dcf7f 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& 
tnode, const Descrip
           _grouping_list(tnode.repeat_node.grouping_list),
           _output_tuple_id(tnode.repeat_node.output_tuple_id),
           _child_eos(false),
-          _repeat_id_idx(0) {}
+          _repeat_id_idx(0) {
+    _child_block = Block::create_shared();
+}
 
 Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -189,12 +191,12 @@ Status VRepeatNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_b
         int size = _repeat_id_list.size();
         if (_repeat_id_idx >= size) {
             _intermediate_block->clear();
-            release_block_memory(_child_block);
+            release_block_memory(*_child_block);
             _repeat_id_idx = 0;
         }
     }
     RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, 
output_block->columns()));
-    *eos = _child_eos && _child_block.rows() == 0;
+    *eos = _child_eos && _child_block->rows() == 0;
     reached_limit(output_block, eos);
     COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     return Status::OK();
@@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state, 
vectorized::Block* input_block, bo
 }
 
 bool VRepeatNode::need_more_input_data() const {
-    return !_child_block.rows() && !_child_eos;
+    return !_child_block->rows() && !_child_eos;
 }
 
 Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
@@ -243,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
     DCHECK(block->rows() == 0);
     while (need_more_input_data()) {
         RETURN_IF_ERROR(child(0)->get_next_after_projects(
-                state, &_child_block, &_child_eos,
+                state, _child_block.get(), &_child_eos,
                 std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
                                   ExecNode::get_next,
                           _children[0], std::placeholders::_1, 
std::placeholders::_2,
                           std::placeholders::_3)));
 
-        static_cast<void>(push(state, &_child_block, _child_eos));
+        static_cast<void>(push(state, _child_block.get(), _child_eos));
     }
 
     return pull(state, block, eos);
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 837b4c8aca1..94737580031 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -57,7 +57,7 @@ public:
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override;
     bool need_more_input_data() const;
-    Block* get_child_block() { return &_child_block; }
+    std::shared_ptr<Block> get_child_block() { return _child_block; }
 
     void debug_string(int indentation_level, std::stringstream* out) const 
override;
 
@@ -74,7 +74,7 @@ private:
     TupleId _output_tuple_id;
     const TupleDescriptor* _output_tuple_desc;
 
-    Block _child_block;
+    std::shared_ptr<Block> _child_block;
     std::unique_ptr<Block> _intermediate_block {};
 
     std::vector<SlotDescriptor*> _output_slots;
diff --git a/be/src/vec/exec/vtable_function_node.cpp 
b/be/src/vec/exec/vtable_function_node.cpp
index 23da667b7d6..be93bde0295 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -45,7 +45,9 @@ namespace doris::vectorized {
 
 VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const 
TPlanNode& tnode,
                                        const DescriptorTbl& descs)
-        : ExecNode(pool, tnode, descs) {}
+        : ExecNode(pool, tnode, descs) {
+    _child_block = Block::create_shared();
+}
 
 Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -144,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, 
Block* block, bool* eos
     // if child_block is empty, get data from child.
     while (need_more_input_data()) {
         RETURN_IF_ERROR(child(0)->get_next_after_projects(
-                state, &_child_block, &_child_eos,
+                state, _child_block.get(), &_child_eos,
                 std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) 
& ExecNode::get_next,
                           _children[0], std::placeholders::_1, 
std::placeholders::_2,
                           std::placeholders::_3)));
 
-        RETURN_IF_ERROR(push(state, &_child_block, _child_eos));
+        RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos));
     }
 
     return pull(state, block, eos);
@@ -170,7 +172,7 @@ Status 
VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while 
getting next batch."));
 
-        if (_child_block.rows() == 0) {
+        if (_child_block->rows() == 0) {
             break;
         }
 
@@ -227,13 +229,13 @@ Status 
VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
 Status VTableFunctionNode::_process_next_child_row() {
     _cur_child_offset++;
 
-    if (_cur_child_offset >= _child_block.rows()) {
+    if (_cur_child_offset >= _child_block->rows()) {
         // release block use count.
         for (TableFunction* fn : _fns) {
             RETURN_IF_ERROR(fn->process_close());
         }
 
-        release_block_memory(_child_block);
+        release_block_memory(*_child_block);
         _cur_child_offset = -1;
         return Status::OK();
     }
diff --git a/be/src/vec/exec/vtable_function_node.h 
b/be/src/vec/exec/vtable_function_node.h
index 56d1d50330d..bcee8ced50f 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -62,7 +62,7 @@ public:
         return VExpr::open(_vfn_ctxs, state);
     }
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-    bool need_more_input_data() const { return !_child_block.rows() && 
!_child_eos; }
+    bool need_more_input_data() const { return !_child_block->rows() && 
!_child_eos; }
 
     void release_resource(doris::RuntimeState* state) override {
         if (_num_rows_filtered_counter != nullptr) {
@@ -92,7 +92,7 @@ public:
         return Status::OK();
     }
 
-    Block* get_child_block() { return &_child_block; }
+    std::shared_ptr<Block> get_child_block() { return _child_block; }
 
 private:
     Status _prepare_output_slot_ids(const TPlanNode& tnode);
@@ -135,7 +135,7 @@ private:
             return;
         }
         for (auto index : _output_slot_indexs) {
-            auto src_column = _child_block.get_by_position(index).column;
+            auto src_column = _child_block->get_by_position(index).column;
             columns[index]->insert_many_from(*src_column, _cur_child_offset,
                                              _current_row_insert_times);
         }
@@ -143,7 +143,7 @@ private:
     }
     int _current_row_insert_times = 0;
 
-    Block _child_block;
+    std::shared_ptr<Block> _child_block;
     std::vector<SlotDescriptor*> _child_slots;
     std::vector<SlotDescriptor*> _output_slots;
     int64_t _cur_child_offset = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to