This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new abc802b5ba9 [bugfix](core) child block is shared between operator and
node, it should be shared ptr (#28106)
abc802b5ba9 is described below
commit abc802b5ba93bf9dc868ce5996ab591bf3b64dcd
Author: yiguolei <[email protected]>
AuthorDate: Sat Dec 9 00:18:14 2023 +0800
[bugfix](core) child block is shared between operator and node, it should
be shared ptr (#28106)
_child_block in nest loop join , table value function, repeat node will be
shared between ExecNode and related operator, but it should not be a unique ptr
in operator, it belongs to exec node.
It will double free the block, if operator's close method is not called
correctly.
It should be a shared ptr, then it will not core even if the opeartor's
close method is not called.
---
.../exec/nested_loop_join_probe_operator.cpp | 3 +--
be/src/pipeline/exec/operator.h | 4 +--
be/src/pipeline/exec/repeat_operator.cpp | 3 +--
be/src/pipeline/exec/table_function_operator.cpp | 3 +--
be/src/vec/exec/join/vnested_loop_join_node.cpp | 30 ++++++++++++----------
be/src/vec/exec/join/vnested_loop_join_node.h | 10 ++++----
be/src/vec/exec/vrepeat_node.cpp | 14 +++++-----
be/src/vec/exec/vrepeat_node.h | 4 +--
be/src/vec/exec/vtable_function_node.cpp | 14 +++++-----
be/src/vec/exec/vtable_function_node.h | 8 +++---
10 files changed, 48 insertions(+), 45 deletions(-)
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index a3d24aa0614..b7477d0b4f8 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator,
StatefulOperator)
Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
- _child_block.reset(_node->get_left_block());
+ _child_block = _node->get_left_block();
return StatefulOperator::prepare(state);
}
Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) {
- _child_block.release();
return StatefulOperator::close(state);
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 2beea932a8b..7ff91fba695 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -444,7 +444,7 @@ public:
StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
: StreamingOperator<OperatorBuilderType>(builder, node),
- _child_block(vectorized::Block::create_unique()),
+ _child_block(vectorized::Block::create_shared()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}
virtual ~StatefulOperator() = default;
@@ -484,7 +484,7 @@ public:
}
protected:
- std::unique_ptr<vectorized::Block> _child_block;
+ std::shared_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index 40f6f5d7b21..d1613a6125f 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
Status RepeatOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
- _child_block.reset(_node->get_child_block());
+ _child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}
Status RepeatOperator::close(doris::RuntimeState* state) {
- _child_block.release();
return StatefulOperator::close(state);
}
diff --git a/be/src/pipeline/exec/table_function_operator.cpp
b/be/src/pipeline/exec/table_function_operator.cpp
index 5e1d5c281eb..6e947c640c9 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator,
StatefulOperator)
Status TableFunctionOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
- _child_block.reset(_node->get_child_block());
+ _child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}
Status TableFunctionOperator::close(doris::RuntimeState* state) {
- _child_block.release();
return StatefulOperator::close(state);
}
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index f3214639856..7d8100aa6ca 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool,
const TPlanNode& tnod
_left_block_pos(0),
_left_side_eos(false),
_old_version_flag(!tnode.__isset.nested_loop_join_node),
- _runtime_filter_descs(tnode.runtime_filters) {}
+ _runtime_filter_descs(tnode.runtime_filters) {
+ _left_block = Block::create_shared();
+}
Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
@@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState*
state, vectorized::Block*
Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) {
do {
- release_block_memory(_left_block);
+ release_block_memory(*_left_block);
RETURN_IF_ERROR(child(0)->get_next_after_projects(
- state, &_left_block, &_left_side_eos,
+ state, _left_block.get(), &_left_side_eos,
std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)));
- } while (_left_block.rows() == 0 && !_left_side_eos);
+ } while (_left_block->rows() == 0 && !_left_side_eos);
return Status::OK();
}
@@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
RETURN_IF_CANCELLED(state);
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
- RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
+ RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos));
}
return pull(state, block, eos);
@@ -280,7 +282,7 @@ void
VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
auto& dst_columns = mutable_block.mutable_columns();
DCHECK(_is_mark_join);
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
- const ColumnWithTypeAndName& src_column =
_left_block.get_by_position(i);
+ const ColumnWithTypeAndName& src_column =
_left_block->get_by_position(i);
if (!src_column.column->is_nullable() &&
dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN);
@@ -310,7 +312,7 @@ void
VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
auto& dst_columns = mutable_block.mutable_columns();
const int max_added_rows = now_process_build_block.rows();
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
- const ColumnWithTypeAndName& src_column =
_left_block.get_by_position(i);
+ const ColumnWithTypeAndName& src_column =
_left_block->get_by_position(i);
if (!src_column.column->is_nullable() &&
dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN);
@@ -456,13 +458,13 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
if (!_is_mark_join) {
auto new_size = column_size;
- DCHECK_LE(_left_block_start_pos + _left_side_process_count,
_left_block.rows());
+ DCHECK_LE(_left_block_start_pos + _left_side_process_count,
_left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
if (_cur_probe_row_visited_flags[j] == IsSemi) {
new_size++;
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
- const ColumnWithTypeAndName src_column =
_left_block.get_by_position(i);
+ const ColumnWithTypeAndName src_column =
_left_block->get_by_position(i);
if (!src_column.column->is_nullable() &&
dst_columns[i]->is_nullable()) {
DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<ColumnNullable*>(dst_columns[i].get())
@@ -488,13 +490,13 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() -
1]);
mark_column.reserve(mark_column.size() + _left_side_process_count);
- DCHECK_LE(_left_block_start_pos + _left_side_process_count,
_left_block.rows());
+ DCHECK_LE(_left_block_start_pos + _left_side_process_count,
_left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
mark_column.insert_value(IsSemi ==
_cur_probe_row_visited_flags[j]);
}
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
- const ColumnWithTypeAndName src_column =
_left_block.get_by_position(i);
+ const ColumnWithTypeAndName src_column =
_left_block->get_by_position(i);
DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
dst_columns[i]->insert_range_from(*src_column.column,
_left_block_start_pos,
_left_side_process_count);
@@ -541,7 +543,7 @@ void
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
}
if constexpr (SetProbeSideFlag) {
int end = filter.size();
- for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos -
1 : _left_block_pos;
+ for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos
- 1 : _left_block_pos;
i >= _left_block_start_pos; i--) {
int offset = 0;
if (!_probe_offset_stack.empty()) {
@@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int
indentation_level, std::stringstream*
}
void VNestedLoopJoinNode::_release_mem() {
- _left_block.clear();
+ _left_block->clear();
Blocks tmp_build_blocks;
_build_blocks.swap(tmp_build_blocks);
@@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state,
vectorized::Block* block,
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_is_output_left_side_only) {
- RETURN_IF_ERROR(_build_output_block(&_left_block, block));
+ RETURN_IF_ERROR(_build_output_block(_left_block.get(), block));
*eos = _left_side_eos;
_need_more_input_data = !_left_side_eos;
} else {
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h
b/be/src/vec/exec/join/vnested_loop_join_node.h
index b309485db51..810bf57e7f5 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -95,7 +95,7 @@ public:
: *_output_row_desc;
}
- Block* get_left_block() { return &_left_block; }
+ std::shared_ptr<Block> get_left_block() { return _left_block; }
std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return
_runtime_filter_descs; }
VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
@@ -120,14 +120,14 @@ private:
// We should try to join rows if there still are some rows from
probe side.
while (_join_block.rows() < state->batch_size()) {
while (_current_build_pos == _build_blocks.size() ||
- _left_block_pos == _left_block.rows()) {
+ _left_block_pos == _left_block->rows()) {
// if left block is empty(), do not need disprocess the
left block rows
- if (_left_block.rows() > _left_block_pos) {
+ if (_left_block->rows() > _left_block_pos) {
_left_side_process_count++;
}
_reset_with_next_probe_row();
- if (_left_block_pos < _left_block.rows()) {
+ if (_left_block_pos < _left_block->rows()) {
if constexpr (set_probe_side_flag) {
_probe_offset_stack.push(mutable_join_block.rows());
}
@@ -260,7 +260,7 @@ private:
// _left_block must be cleared before calling get_next(). The child node
// does not initialize all tuple ptrs in the row, only the ones that it
// is responsible for.
- Block _left_block;
+ std::shared_ptr<Block> _left_block;
int _left_block_start_pos = 0;
int _left_block_pos; // current scan pos in _left_block
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index aaa7ed17d00..1765c1dcf7f 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode&
tnode, const Descrip
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id),
_child_eos(false),
- _repeat_id_idx(0) {}
+ _repeat_id_idx(0) {
+ _child_block = Block::create_shared();
+}
Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -189,12 +191,12 @@ Status VRepeatNode::pull(doris::RuntimeState* state,
vectorized::Block* output_b
int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_intermediate_block->clear();
- release_block_memory(_child_block);
+ release_block_memory(*_child_block);
_repeat_id_idx = 0;
}
}
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
- *eos = _child_eos && _child_block.rows() == 0;
+ *eos = _child_eos && _child_block->rows() == 0;
reached_limit(output_block, eos);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
return Status::OK();
@@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state,
vectorized::Block* input_block, bo
}
bool VRepeatNode::need_more_input_data() const {
- return !_child_block.rows() && !_child_eos;
+ return !_child_block->rows() && !_child_eos;
}
Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
@@ -243,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
DCHECK(block->rows() == 0);
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
- state, &_child_block, &_child_eos,
+ state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)));
- static_cast<void>(push(state, &_child_block, _child_eos));
+ static_cast<void>(push(state, _child_block.get(), _child_eos));
}
return pull(state, block, eos);
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 837b4c8aca1..94737580031 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -57,7 +57,7 @@ public:
Status pull(RuntimeState* state, vectorized::Block* output_block, bool*
eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
override;
bool need_more_input_data() const;
- Block* get_child_block() { return &_child_block; }
+ std::shared_ptr<Block> get_child_block() { return _child_block; }
void debug_string(int indentation_level, std::stringstream* out) const
override;
@@ -74,7 +74,7 @@ private:
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc;
- Block _child_block;
+ std::shared_ptr<Block> _child_block;
std::unique_ptr<Block> _intermediate_block {};
std::vector<SlotDescriptor*> _output_slots;
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index 23da667b7d6..be93bde0295 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -45,7 +45,9 @@ namespace doris::vectorized {
VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs) {}
+ : ExecNode(pool, tnode, descs) {
+ _child_block = Block::create_shared();
+}
Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -144,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state,
Block* block, bool* eos
// if child_block is empty, get data from child.
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
- state, &_child_block, &_child_eos,
+ state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*))
& ExecNode::get_next,
_children[0], std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)));
- RETURN_IF_ERROR(push(state, &_child_block, _child_eos));
+ RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos));
}
return pull(state, block, eos);
@@ -170,7 +172,7 @@ Status
VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while
getting next batch."));
- if (_child_block.rows() == 0) {
+ if (_child_block->rows() == 0) {
break;
}
@@ -227,13 +229,13 @@ Status
VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
Status VTableFunctionNode::_process_next_child_row() {
_cur_child_offset++;
- if (_cur_child_offset >= _child_block.rows()) {
+ if (_cur_child_offset >= _child_block->rows()) {
// release block use count.
for (TableFunction* fn : _fns) {
RETURN_IF_ERROR(fn->process_close());
}
- release_block_memory(_child_block);
+ release_block_memory(*_child_block);
_cur_child_offset = -1;
return Status::OK();
}
diff --git a/be/src/vec/exec/vtable_function_node.h
b/be/src/vec/exec/vtable_function_node.h
index 56d1d50330d..bcee8ced50f 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -62,7 +62,7 @@ public:
return VExpr::open(_vfn_ctxs, state);
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
- bool need_more_input_data() const { return !_child_block.rows() &&
!_child_eos; }
+ bool need_more_input_data() const { return !_child_block->rows() &&
!_child_eos; }
void release_resource(doris::RuntimeState* state) override {
if (_num_rows_filtered_counter != nullptr) {
@@ -92,7 +92,7 @@ public:
return Status::OK();
}
- Block* get_child_block() { return &_child_block; }
+ std::shared_ptr<Block> get_child_block() { return _child_block; }
private:
Status _prepare_output_slot_ids(const TPlanNode& tnode);
@@ -135,7 +135,7 @@ private:
return;
}
for (auto index : _output_slot_indexs) {
- auto src_column = _child_block.get_by_position(index).column;
+ auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_many_from(*src_column, _cur_child_offset,
_current_row_insert_times);
}
@@ -143,7 +143,7 @@ private:
}
int _current_row_insert_times = 0;
- Block _child_block;
+ std::shared_ptr<Block> _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]