This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7c67fa8651 [Bug](pipeline) fix bug of right anti join error result in
pipeline (#15165)
7c67fa8651 is described below
commit 7c67fa865169494dc9ea1e48aa2d3d20d037e794
Author: HappenLee <[email protected]>
AuthorDate: Mon Dec 19 19:28:44 2022 +0800
[Bug](pipeline) fix bug of right anti join error result in pipeline (#15165)
---
be/src/pipeline/exec/operator.h | 2 +-
be/src/vec/exec/join/vhash_join_node.cpp | 55 +++++++++++++++++---------------
2 files changed, 30 insertions(+), 27 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index bbbb42efda..4cd8346f88 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -428,7 +428,7 @@ public:
if (node->need_more_input_data()) {
RETURN_IF_ERROR(child->get_block(state, _child_block.get(),
_child_source_state));
source_state = _child_source_state;
- if (_child_block->rows() == 0) {
+ if (_child_block->rows() == 0 && source_state !=
SourceState::FINISHED) {
return Status::OK();
}
node->prepare_for_next();
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index c76021c64b..586ee87c6f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -551,34 +551,37 @@ Status HashJoinNode::pull(doris::RuntimeState* /*state*/,
vectorized::Block* out
return Status::OK();
}
-Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block*
input_block, bool /*eos*/) {
- COUNTER_UPDATE(_probe_rows_counter, _probe_block.rows());
- int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
- _probe_columns.resize(probe_expr_ctxs_sz);
-
- std::vector<int> res_col_ids(probe_expr_ctxs_sz);
- RETURN_IF_ERROR(
- _do_evaluate(*input_block, _probe_expr_ctxs,
*_probe_expr_call_timer, res_col_ids));
- if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
- _probe_column_convert_to_null = _convert_block_to_null(*input_block);
- }
- // TODO: Now we are not sure whether a column is nullable only by
ExecNode's `row_desc`
- // so we have to initialize this flag by the first probe block.
- if (!_has_set_need_null_map_for_probe) {
- _has_set_need_null_map_for_probe = true;
- _need_null_map_for_probe = _need_probe_null_map(*input_block,
res_col_ids);
- }
- if (_need_null_map_for_probe) {
- if (_null_map_column == nullptr) {
- _null_map_column = ColumnUInt8::create();
+Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block*
input_block, bool eos) {
+ _probe_eos = eos;
+ if (input_block->rows() > 0) {
+ COUNTER_UPDATE(_probe_rows_counter, _probe_block.rows());
+ int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
+ _probe_columns.resize(probe_expr_ctxs_sz);
+
+ std::vector<int> res_col_ids(probe_expr_ctxs_sz);
+ RETURN_IF_ERROR(
+ _do_evaluate(*input_block, _probe_expr_ctxs,
*_probe_expr_call_timer, res_col_ids));
+ if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
+ _probe_column_convert_to_null =
_convert_block_to_null(*input_block);
+ }
+ // TODO: Now we are not sure whether a column is nullable only by
ExecNode's `row_desc`
+ // so we have to initialize this flag by the first probe block.
+ if (!_has_set_need_null_map_for_probe) {
+ _has_set_need_null_map_for_probe = true;
+ _need_null_map_for_probe = _need_probe_null_map(*input_block,
res_col_ids);
+ }
+ if (_need_null_map_for_probe) {
+ if (_null_map_column == nullptr) {
+ _null_map_column = ColumnUInt8::create();
+ }
+ _null_map_column->get_data().assign(input_block->rows(),
(uint8_t)0);
}
- _null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
- }
- RETURN_IF_ERROR(_extract_join_column<false>(*input_block,
_null_map_column, _probe_columns,
- res_col_ids));
- if (&_probe_block != input_block) {
- input_block->swap(_probe_block);
+ RETURN_IF_ERROR(_extract_join_column<false>(*input_block,
_null_map_column, _probe_columns,
+ res_col_ids));
+ if (&_probe_block != input_block) {
+ input_block->swap(_probe_block);
+ }
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]