This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8ecf69b09b [pipeline](regression) nested loop join test get error
result in pipeline engine and refactor the code for need more input data
(#15208)
8ecf69b09b is described below
commit 8ecf69b09b034612967223a35cb65bad5c742967
Author: HappenLee <[email protected]>
AuthorDate: Wed Dec 21 19:03:51 2022 +0800
[pipeline](regression) nested loop join test get error result in pipeline
engine and refactor the code for need more input data (#15208)
---
be/src/pipeline/exec/operator.h | 22 +++---
be/src/pipeline/exec/repeat_operator.cpp | 11 +++
be/src/pipeline/exec/repeat_operator.h | 4 ++
be/src/vec/exec/join/vhash_join_node.cpp | 29 ++++----
be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +-
be/src/vec/exec/scan/vscan_node.cpp | 3 +-
be/src/vec/exec/vrepeat_node.cpp | 90 +++++++++++--------------
be/src/vec/exec/vrepeat_node.h | 5 +-
be/src/vec/exec/vtable_function_node.cpp | 34 ++++------
be/src/vec/exec/vtable_function_node.h | 4 +-
10 files changed, 95 insertions(+), 109 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 652486aa34..58c36b3a51 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -432,24 +432,24 @@ public:
auto& child = StreamingOperator<OperatorBuilderType>::_child;
if (node->need_more_input_data()) {
+ _child_block->clear_column_data();
RETURN_IF_ERROR(child->get_block(state, _child_block.get(),
_child_source_state));
source_state = _child_source_state;
- if (_child_block->rows() == 0 && source_state !=
SourceState::FINISHED) {
+ if (_child_block->rows() == 0 && _child_source_state !=
SourceState::FINISHED) {
return Status::OK();
}
node->prepare_for_next();
- node->push(state, _child_block.get(), source_state ==
SourceState::FINISHED);
+ node->push(state, _child_block.get(), _child_source_state ==
SourceState::FINISHED);
}
- bool eos = false;
- RETURN_IF_ERROR(node->pull(state, block, &eos));
- if (eos) {
- source_state = SourceState::FINISHED;
- _child_block->clear_column_data();
- } else if (!node->need_more_input_data()) {
- source_state = SourceState::MORE_DATA;
- } else {
- _child_block->clear_column_data();
+ if (!node->need_more_input_data()) {
+ bool eos = false;
+ RETURN_IF_ERROR(node->pull(state, block, &eos));
+ if (eos) {
+ source_state = SourceState::FINISHED;
+ } else if (!node->need_more_input_data()) {
+ source_state = SourceState::MORE_DATA;
+ }
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index def1f6da9d..d2c9f0a1e2 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -23,4 +23,15 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
+Status RepeatOperator::prepare(doris::RuntimeState* state) {
+ // just for speed up, the way is dangerous
+ _child_block.reset(_node->get_child_block());
+ return StatefulOperator::prepare(state);
+}
+
+Status RepeatOperator::close(doris::RuntimeState* state) {
+ _child_block.release();
+ return StatefulOperator::close(state);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/repeat_operator.h
b/be/src/pipeline/exec/repeat_operator.h
index 15707ea39c..b397ea05d3 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -37,6 +37,10 @@ public:
class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> {
public:
RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode*
repeat_node);
+
+ Status prepare(RuntimeState* state) override;
+
+ Status close(RuntimeState* state) override;
};
} // namespace pipeline
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 337aec7b1e..fc2a7ed4dc 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -597,24 +597,19 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
*eos = true;
return Status::OK();
}
- if (need_more_input_data()) {
+ while (need_more_input_data()) {
prepare_for_next();
- do {
- SCOPED_TIMER(_probe_next_timer);
- RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next_after_projects(
- state, &_probe_block, &_probe_eos,
- std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*,
- bool*)) &
- ExecNode::get_next,
- _children[0], std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3)),
- child(0)->get_next_span(), _probe_eos);
- } while (_probe_block.rows() == 0 && !_probe_eos);
-
- if (_probe_block.rows() != 0) {
- RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos));
- }
+ SCOPED_TIMER(_probe_next_timer);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(0)->get_next_after_projects(
+ state, &_probe_block, &_probe_eos,
+ std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
+ ExecNode::get_next,
+ _children[0], std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3)),
+ child(0)->get_next_span(), _probe_eos);
+
+ RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos));
}
return pull(state, output_block, eos);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 82cb6f8605..7fc43fcf19 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -671,7 +671,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state,
vectorized::Block* block,
}
bool VNestedLoopJoinNode::need_more_input_data() const {
- return _need_more_input_data;
+ return _need_more_input_data and !_left_side_eos;
}
void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 8d436bb733..2f35cd47fc 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -262,8 +262,7 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
!_runtime_filter_ctxs[i].apply_mark) {
_blocked_by_rf = true;
} else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(!_blocked_by_rf &&
- runtime_filter->current_state() !=
RuntimeFilterState::NOT_READY);
+ DCHECK(runtime_filter->current_state() !=
RuntimeFilterState::NOT_READY);
_is_all_rf_applied = false;
}
}
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 961e5aad4b..01db7d29ef 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -55,7 +55,6 @@ Status VRepeatNode::prepare(RuntimeState* state) {
for (const auto& slot_desc : _output_tuple_desc->slots()) {
_output_slots.push_back(slot_desc);
}
- _child_block.reset(new Block());
return Status::OK();
}
@@ -181,50 +180,51 @@ Status VRepeatNode::pull(doris::RuntimeState* state,
vectorized::Block* output_b
}
DCHECK(output_block->rows() == 0);
- if (!_intermediate_block || _intermediate_block->rows() == 0) {
- return Status::OK();
- }
+ if (_intermediate_block && _intermediate_block->rows() > 0) {
+ RETURN_IF_ERROR(
+ get_repeated_block(_intermediate_block.get(), _repeat_id_idx,
output_block));
- RETURN_IF_ERROR(get_repeated_block(_intermediate_block.get(),
_repeat_id_idx, output_block));
+ _repeat_id_idx++;
- _repeat_id_idx++;
-
- int size = _repeat_id_list.size();
- if (_repeat_id_idx >= size) {
- _intermediate_block->clear();
- release_block_memory(*_child_block);
- _repeat_id_idx = 0;
+ int size = _repeat_id_list.size();
+ if (_repeat_id_idx >= size) {
+ _intermediate_block->clear();
+ release_block_memory(_child_block);
+ _repeat_id_idx = 0;
+ }
}
+ *eos = _child_eos && _child_block.rows() == 0;
reached_limit(output_block, eos);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
return Status::OK();
}
Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
- if (input_block->rows() == 0) {
- return Status::OK();
- }
+ _child_eos = eos;
DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
DCHECK(!_expr_ctxs.empty());
- _intermediate_block.reset(new Block());
-
- for (auto expr : _expr_ctxs) {
- int result_column_id = -1;
- RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
- DCHECK(result_column_id != -1);
- input_block->get_by_position(result_column_id).column =
- input_block->get_by_position(result_column_id)
- .column->convert_to_full_column_if_const();
-
_intermediate_block->insert(input_block->get_by_position(result_column_id));
+
+ if (input_block->rows() > 0) {
+ _intermediate_block.reset(new Block());
+
+ for (auto expr : _expr_ctxs) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
+ DCHECK(result_column_id != -1);
+ input_block->get_by_position(result_column_id).column =
+ input_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+
_intermediate_block->insert(input_block->get_by_position(result_column_id));
+ }
+ DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns());
}
- DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns());
return Status::OK();
}
bool VRepeatNode::need_more_input_data() {
- return !_intermediate_block || _intermediate_block->rows() == 0;
+ return !_child_block.rows() && !_child_eos;
}
Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
@@ -241,26 +241,17 @@ Status VRepeatNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
DCHECK(_repeat_id_idx <= (int)v.size());
}
DCHECK(block->rows() == 0);
-
- if (need_more_input_data()) {
- while (_child_block->rows() == 0 && !_child_eos) {
- RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next_after_projects(
- state, _child_block.get(), &_child_eos,
- std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*,
- bool*)) &
- ExecNode::get_next,
- _children[0], std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3)),
- child(0)->get_next_span(), _child_eos);
- }
-
- if (_child_eos and _child_block->rows() == 0) {
- *eos = true;
- return Status::OK();
- }
-
- push(state, _child_block.get(), *eos);
+ while (need_more_input_data()) {
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(0)->get_next_after_projects(
+ state, &_child_block, &_child_eos,
+ std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
+ ExecNode::get_next,
+ _children[0], std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3)),
+ child(0)->get_next_span(), _child_eos);
+
+ push(state, &_child_block, _child_eos);
}
return pull(state, block, eos);
@@ -294,9 +285,4 @@ void VRepeatNode::debug_string(int indentation_level,
std::stringstream* out) co
*out << ")";
}
-void VRepeatNode::_release_mem() {
- _child_block = nullptr;
- _intermediate_block = nullptr;
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 53eb025cc1..394690d729 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -46,6 +46,7 @@ public:
Status pull(RuntimeState* state, vectorized::Block* output_block, bool*
eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
override;
bool need_more_input_data();
+ Block* get_child_block() { return &_child_block; }
protected:
virtual void debug_string(int indentation_level, std::stringstream* out)
const override;
@@ -53,8 +54,6 @@ protected:
private:
Status get_repeated_block(Block* child_block, int repeat_id_idx, Block*
output_block);
- void _release_mem();
-
// Slot id set used to indicate those slots need to set to null.
std::vector<std::set<SlotId>> _slot_id_set_list;
// all slot id
@@ -65,7 +64,7 @@ private:
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc;
- std::unique_ptr<Block> _child_block {};
+ Block _child_block;
std::unique_ptr<Block> _intermediate_block {};
std::vector<SlotDescriptor*> _output_slots;
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index 9336040f9a..c26bfdba21 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -83,29 +83,20 @@ Status VTableFunctionNode::get_next(RuntimeState* state,
Block* block, bool* eos
RETURN_IF_CANCELLED(state);
// if child_block is empty, get data from child.
- if (need_more_input_data()) {
- while (_child_block.rows() == 0 && !_child_eos) {
- RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next_after_projects(
- state, &_child_block, &_child_eos,
- std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*,
- bool*)) &
- ExecNode::get_next,
- _children[0], std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3)),
- child(0)->get_next_span(), _child_eos);
- }
- if (_child_eos && _child_block.rows() == 0) {
- *eos = true;
- return Status::OK();
- }
-
- push(state, &_child_block, *eos);
+ while (need_more_input_data()) {
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(0)->get_next_after_projects(
+ state, &_child_block, &_child_eos,
+ std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
+ ExecNode::get_next,
+ _children[0], std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3)),
+ child(0)->get_next_span(), _child_eos);
+
+ push(state, &_child_block, _child_eos);
}
- pull(state, block, eos);
-
- return Status::OK();
+ return pull(state, block, eos);
}
Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block*
output_block, bool* eos) {
@@ -204,6 +195,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState*
state, Block* output
RETURN_IF_ERROR(
VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
+ *eos = _child_eos && _cur_child_offset == -1;
return Status::OK();
}
diff --git a/be/src/vec/exec/vtable_function_node.h
b/be/src/vec/exec/vtable_function_node.h
index 451a35c739..c831e55856 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -30,10 +30,10 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
Status prepare(RuntimeState* state) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
- bool need_more_input_data() { return !_child_block.rows(); }
+ bool need_more_input_data() { return !_child_block.rows() && !_child_eos; }
Status push(RuntimeState*, vectorized::Block* input_block, bool eos)
override {
+ _child_eos = eos;
if (input_block->rows() == 0) {
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]