This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0732f31e5d [Bug](pipeline) Fix bugs for scan node and join node
(#15164)
0732f31e5d is described below
commit 0732f31e5def5c714c907a5fbef4dc6860be354c
Author: Gabriel <[email protected]>
AuthorDate: Mon Dec 19 15:59:29 2022 +0800
[Bug](pipeline) Fix bugs for scan node and join node (#15164)
* [Bug](pipeline) Fix bugs for scan node and join node
* update
---
be/src/pipeline/exec/operator.h | 12 ++++++++++--
be/src/vec/exec/join/vhash_join_node.cpp | 2 +-
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4679832bb1..bbbb42efda 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -237,7 +237,6 @@ protected:
// TODO pipeline Account for peak memory used by this operator
RuntimeProfile::Counter* _memory_used_counter = nullptr;
-private:
bool _is_closed = false;
};
@@ -283,8 +282,13 @@ public:
}
Status close(RuntimeState* state) override {
+ if (is_closed()) {
+ return Status::OK();
+ }
_fresh_exec_timer(_sink);
- return _sink->close(state, Status::OK());
+ RETURN_IF_ERROR(_sink->close(state, Status::OK()));
+ _is_closed = true;
+ return Status::OK();
}
Status finalize(RuntimeState* state) override { return Status::OK(); }
@@ -336,10 +340,14 @@ public:
}
Status close(RuntimeState* state) override {
+ if (is_closed()) {
+ return Status::OK();
+ }
_fresh_exec_timer(_node);
if (!_node->decrease_ref()) {
_node->release_resource(state);
}
+ _is_closed = true;
return Status::OK();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 80e47089f7..c76021c64b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -572,7 +572,7 @@ Status HashJoinNode::push(RuntimeState* /*state*/,
vectorized::Block* input_bloc
if (_null_map_column == nullptr) {
_null_map_column = ColumnUInt8::create();
}
- _null_map_column->get_data().assign(_probe_block.rows(), (uint8_t)0);
+ _null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
}
RETURN_IF_ERROR(_extract_join_column<false>(*input_block,
_null_map_column, _probe_columns,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]