This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 86e13e934432bd9abcfc75ac10025b3afa638276
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Aug 30 18:58:59 2023 +0800

    [fix](union) should open/alloc_resource in sink operator instead of source 
(#23637)
---
 be/src/pipeline/exec/union_sink_operator.h   |  2 --
 be/src/pipeline/exec/union_source_operator.h |  3 +++
 be/src/vec/exec/vunion_node.cpp              | 10 +++++++++-
 be/src/vec/exec/vunion_node.h                |  3 +++
 4 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index dfe7085d5e..0da75147cc 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -56,8 +56,6 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    // this operator in sink open directly return, do this work in source
-    Status open(RuntimeState* /*state*/) override { return Status::OK(); }
 
     Status close(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 8051bdd512..8bd2f484f3 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -52,6 +52,9 @@ public:
     UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node,
                         std::shared_ptr<DataQueue>);
 
+    // this operator in source open directly return, do this work in sink
+    Status open(RuntimeState* /*state*/) override { return Status::OK(); }
+
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
     bool can_read() override;
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index e9caa494fd..90cbe5bb7a 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -106,6 +106,12 @@ Status VUnionNode::open(RuntimeState* state) {
 
 Status VUnionNode::alloc_resource(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+    std::unique_lock<std::mutex> l(_resource_lock);
+    if (_resource_allocated) {
+        return Status::OK();
+    }
+
     // open const expr lists.
     for (const auto& exprs : _const_expr_lists) {
         RETURN_IF_ERROR(VExpr::open(exprs, state));
@@ -114,7 +120,9 @@ Status VUnionNode::alloc_resource(RuntimeState* state) {
     for (const auto& exprs : _child_expr_lists) {
         RETURN_IF_ERROR(VExpr::open(exprs, state));
     }
-    return ExecNode::alloc_resource(state);
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
+    _resource_allocated = true;
+    return Status::OK();
 }
 
 Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) {
diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h
index c25bb07102..ac63f9ca63 100644
--- a/be/src/vec/exec/vunion_node.h
+++ b/be/src/vec/exec/vunion_node.h
@@ -91,6 +91,9 @@ private:
     /// to -1 if no child needs to be closed.
     int _to_close_child_idx;
 
+    std::mutex _resource_lock;
+    bool _resource_allocated {false};
+
     // Time spent to evaluates exprs and materializes the results
     RuntimeProfile::Counter* _materialize_exprs_evaluate_timer = nullptr;
     /// GetNext() for the passthrough case. We pass 'block' directly into the 
GetNext()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to