This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7037abc6231 [refactor](close)Full refactor async writer (#30082)
7037abc6231 is described below

commit 7037abc6231b2bba7c4ac3e2d4b34ca8a09ec061
Author: yiguolei <[email protected]>
AuthorDate: Tue Jan 23 13:21:17 2024 +0800

    [refactor](close)Full refactor async writer (#30082)
    
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/exec/data_sink.h                            |  4 --
 be/src/exec/exec_node.h                            |  4 --
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 53 ++++++++++----
 be/src/pipeline/exec/exchange_sink_buffer.h        |  5 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 34 +++++----
 be/src/pipeline/exec/exchange_sink_operator.h      |  2 -
 be/src/pipeline/exec/operator.h                    |  6 --
 be/src/pipeline/pipeline_fragment_context.cpp      |  7 +-
 be/src/pipeline/pipeline_fragment_context.h        |  1 -
 be/src/pipeline/pipeline_task.cpp                  | 13 +---
 be/src/pipeline/pipeline_task.h                    |  5 --
 be/src/pipeline/pipeline_x/operator.cpp            | 21 +++---
 be/src/pipeline/pipeline_x/operator.h              | 13 ----
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  6 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 15 +---
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  3 -
 be/src/pipeline/task_scheduler.cpp                 | 82 ++++++++++++----------
 be/src/pipeline/task_scheduler.h                   |  6 +-
 be/src/runtime/task_execution_context.h            |  1 +
 be/src/vec/sink/async_writer_sink.h                | 19 ++---
 be/src/vec/sink/vdata_stream_sender.cpp            | 22 +++---
 be/src/vec/sink/vdata_stream_sender.h              |  2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     | 27 ++++---
 be/src/vec/sink/writer/async_result_writer.h       |  4 +-
 24 files changed, 177 insertions(+), 178 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index be6cfe236b7..c0b27e8ae90 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -70,10 +70,6 @@ public:
         return send(state, block, eos);
     }
 
-    [[nodiscard]] virtual Status try_close(RuntimeState* state, Status 
exec_status) {
-        return Status::OK();
-    }
-
     [[nodiscard]] virtual bool is_pending_finish() const { return false; }
 
     // Releases all resources that were allocated in prepare()/send().
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 903122ecded..5a6b04667e7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -232,10 +232,6 @@ public:
 
     size_t children_count() const { return _children.size(); }
 
-    // when the fragment is normal finished, call this method to do some 
finish work
-    // such as send the last buffer to remote.
-    virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
     std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
 
 protected:
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 4484a34375b..9d240945be1 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -85,14 +85,15 @@ namespace pipeline {
 
 template <typename Parent>
 ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id,
-                                               int send_id, int be_number, 
QueryContext* context)
-        : _queue_capacity(0),
+                                               int send_id, int be_number, 
RuntimeState* state)
+        : HasTaskExecutionCtx(state),
+          _queue_capacity(0),
           _is_finishing(false),
           _query_id(query_id),
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
           _be_number(be_number),
-          _context(context) {}
+          _context(state->get_query_ctx()) {}
 
 template <typename Parent>
 ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
@@ -270,12 +271,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
-        send_callback->addFailedHandler(
-                [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
+        send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
+                                                const InstanceLoId& id, const 
std::string& err) {
+            auto task_lock = weak_task_ctx.lock();
+            if (task_lock == nullptr) {
+                // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
+                return;
+            }
+            _failed(id, err);
+        });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
-        send_callback->addSuccessHandler([&](const InstanceLoId& id, const 
bool& eos,
-                                             const PTransmitDataResult& result,
-                                             const int64_t& start_rpc_time) {
+        send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
+                                                 const InstanceLoId& id, const 
bool& eos,
+                                                 const PTransmitDataResult& 
result,
+                                                 const int64_t& 
start_rpc_time) {
+            auto task_lock = weak_task_ctx.lock();
+            if (task_lock == nullptr) {
+                // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
+                return;
+            }
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
@@ -332,12 +346,25 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
-        send_callback->addFailedHandler(
-                [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
+        send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
+                                                const InstanceLoId& id, const 
std::string& err) {
+            auto task_lock = weak_task_ctx.lock();
+            if (task_lock == nullptr) {
+                // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
+                return;
+            }
+            _failed(id, err);
+        });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
-        send_callback->addSuccessHandler([&](const InstanceLoId& id, const 
bool& eos,
-                                             const PTransmitDataResult& result,
-                                             const int64_t& start_rpc_time) {
+        send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
+                                                 const InstanceLoId& id, const 
bool& eos,
+                                                 const PTransmitDataResult& 
result,
+                                                 const int64_t& 
start_rpc_time) {
+            auto task_lock = weak_task_ctx.lock();
+            if (task_lock == nullptr) {
+                // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
+                return;
+            }
             set_rpc_time(id, start_rpc_time, result.receive_time());
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index a11b637f4d4..f0b55d528ec 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -196,9 +196,10 @@ struct ExchangeRpcContext {
 
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
 template <typename Parent>
-class ExchangeSinkBuffer {
+class ExchangeSinkBuffer : public HasTaskExecutionCtx {
 public:
-    ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, QueryContext*);
+    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
+                       RuntimeState* state);
     ~ExchangeSinkBuffer();
     void register_sink(TUniqueId);
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index aa29bb4cf0f..1f9ba3b4203 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -65,7 +65,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
-            id, _dest_node_id, _sink->_sender_id, _state->be_number(), 
state->get_query_ctx());
+            id, _dest_node_id, _sink->_sender_id, _state->be_number(), state);
 
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->register_pipeline_channels(_sink_buffer.get());
@@ -168,7 +168,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
-            id, p._dest_node_id, _sender_id, _state->be_number(), 
state->get_query_ctx());
+            id, p._dest_node_id, _sender_id, _state->be_number(), state);
 
     register_channels(_sink_buffer.get());
     auto* _exchange_sink_dependency = _dependency;
@@ -414,7 +414,20 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         // 1. calculate range
         // 2. dispatch rows to channel
     }
-    return Status::OK();
+
+    Status final_st = Status::OK();
+    if (source_state == SourceState::FINISHED) {
+        local_state._serializer.reset_block();
+        for (int i = 0; i < local_state.channels.size(); ++i) {
+            Status st = local_state.channels[i]->close(state, Status::OK());
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        local_state._sink_buffer->set_should_stop();
+        return final_st;
+    }
+    return final_st;
 }
 
 Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, 
vectorized::Block* src,
@@ -488,21 +501,6 @@ Status 
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
     return Status::OK();
 }
 
-Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status 
exec_status) {
-    auto& local_state = get_local_state(state);
-    local_state._serializer.reset_block();
-    Status final_st = Status::OK();
-    Status final_status = exec_status;
-    for (int i = 0; i < local_state.channels.size(); ++i) {
-        Status st = local_state.channels[i]->close(state, exec_status);
-        if (!st.ok() && final_st.ok()) {
-            final_st = st;
-        }
-    }
-    local_state._sink_buffer->set_should_stop();
-    return final_st;
-}
-
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6d1d1b6a4fe..3e6486e34fd 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -217,8 +217,6 @@ public:
     Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* 
src, PBlock* dest,
                            int num_receivers = 1);
 
-    Status try_close(RuntimeState* state, Status exec_status) override;
-
 private:
     friend class ExchangeSinkLocalState;
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index bf41c670e0c..8755872a979 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -239,8 +239,6 @@ public:
      */
     virtual bool is_pending_finish() const { return false; }
 
-    virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
     bool is_closed() const { return _is_closed; }
 
     const OperatorBuilderBase* operator_builder() const { return 
_operator_builder; }
@@ -289,10 +287,6 @@ public:
         return Status::OK();
     }
 
-    Status try_close(RuntimeState* state) override {
-        return _sink->try_close(state, state->query_status());
-    }
-
     [[nodiscard]] bool is_pending_finish() const override { return 
_sink->is_pending_finish(); }
 
     Status close(RuntimeState* state) override {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 909039b23fb..2341071d963 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -163,13 +163,12 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     // make result receiver on fe be stocked on rpc forever until timeout...
     // We need a more detail discussion.
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
-        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+        if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+            _is_report_on_cancel = false;
+        } else {
             LOG(WARNING) << "PipelineFragmentContext "
                          << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
                          << " is canceled, cancel message: " << msg;
-
-        } else {
-            _set_is_report_on_cancel(false); // TODO bug llj fix this not 
projected by lock
         }
 
         _runtime_state->set_process_status(_query_ctx->exec_status());
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index a7a45d8f07f..0ec27c5054f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -158,7 +158,6 @@ protected:
     Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
     virtual void _close_fragment_instance();
     void _init_next_report_time();
-    void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
 
     // Id of this query
     TUniqueId _query_id;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index d87c477a8db..9b72762ebf9 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -301,17 +301,8 @@ Status PipelineTask::execute(bool* eos) {
         _finish_p_dependency();
     }
 
-    return Status::OK();
-}
-
-Status PipelineTask::try_close(Status exec_status) {
-    if (_try_close_flag) {
-        return Status::OK();
-    }
-    _try_close_flag = true;
-    Status status1 = _sink->try_close(_state);
-    Status status2 = _source->try_close(_state);
-    return status1.ok() ? status2 : status1;
+    // If the status is eof(sink node will return eof if downstream fragment 
finished), then return it.
+    return status;
 }
 
 Status PipelineTask::close(Status exec_status) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 56e42370ff2..dd512293e05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -132,9 +132,6 @@ public:
 
     virtual Status execute(bool* eos);
 
-    // Try to close this pipeline task. If there are still some resources need 
to be released after `try_close`,
-    // this task will enter the `PENDING_FINISH` state.
-    virtual Status try_close(Status exec_status);
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
     virtual Status close(Status exec_status);
@@ -328,8 +325,6 @@ protected:
     int _queue_level = 0;
     int _core_id = 0;
 
-    bool _try_close_flag = false;
-
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;
     RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index e00b1632eb4..a59f4ced6de 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -544,20 +544,21 @@ Status AsyncWriterSink<Writer, 
Parent>::close(RuntimeState* state, Status exec_s
     COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     // if the init failed, the _writer may be nullptr. so here need check
     if (_writer) {
-        RETURN_IF_ERROR(_writer->get_writer_status());
+        Status st = _writer->get_writer_status();
+        if (exec_status.ok()) {
+            _writer->force_close(state->is_cancelled() ? 
Status::Cancelled("Cancelled")
+                                                       : 
Status::Cancelled("force close"));
+        } else {
+            _writer->force_close(exec_status);
+        }
+        // If there is an error in process_block thread, then we should get 
the writer
+        // status before call force_close. For example, the thread may failed 
in commit
+        // transaction.
+        RETURN_IF_ERROR(st);
     }
     return Base::close(state, exec_status);
 }
 
-template <typename Writer, typename Parent>
-    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
-Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status 
exec_status) {
-    if (state->is_cancelled() || !exec_status.ok()) {
-        _writer->force_close(!exec_status.ok() ? exec_status : 
Status::Cancelled("Cancelled"));
-    }
-    return Status::OK();
-}
-
 #define DECLARE_OPERATOR_X(LOCAL_STATE) template class 
DataSinkOperatorX<LOCAL_STATE>;
 DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
 DECLARE_OPERATOR_X(ResultSinkLocalState)
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index ca1b224c5c5..98feb60ac22 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -387,7 +387,6 @@ public:
     // idempotent (e.g. wait for runtime filters).
     virtual Status open(RuntimeState* state) = 0;
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
-    virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const = 0;
 
@@ -512,10 +511,6 @@ public:
         return Status::InternalError("Should not reach here!");
     }
 
-    Status try_close(RuntimeState* state) override {
-        return Status::InternalError("Should not reach here!");
-    }
-
     bool can_read() override {
         LOG(FATAL) << "should not reach here!";
         return false;
@@ -546,10 +541,6 @@ public:
         return state->get_sink_local_state(operator_id())->close(state, 
exec_status);
     }
 
-    [[nodiscard]] virtual Status try_close(RuntimeState* state, Status 
exec_status) {
-        return state->get_sink_local_state(operator_id())->try_close(state, 
exec_status);
-    }
-
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "Runtime Profile is not owned by operator");
@@ -620,8 +611,6 @@ public:
 
     Status open(RuntimeState* state) override { return Status::OK(); }
 
-    Status try_close(RuntimeState* state, Status exec_status) override { 
return Status::OK(); }
-
     Status close(RuntimeState* state, Status exec_status) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level) const 
override;
@@ -704,8 +693,6 @@ public:
     Dependency* dependency() override { return _async_writer_dependency.get(); 
}
     Status close(RuntimeState* state, Status exec_status) override;
 
-    Status try_close(RuntimeState* state, Status exec_status) override;
-
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
 protected:
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index cf4c312926e..aa580b49d48 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -133,12 +133,12 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout 
: " << debug_string();
     }
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
-        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+        if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
+            _is_report_on_cancel = false;
+        } else {
             for (auto& id : _fragment_instance_ids) {
                 LOG(WARNING) << "PipelineXFragmentContext cancel instance: " 
<< print_id(id);
             }
-        } else {
-            _set_is_report_on_cancel(false); // TODO bug llj
         }
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1b1c04eb814..c44570eb7c6 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -256,6 +256,7 @@ Status PipelineXTask::execute(bool* eos) {
         }
     }
 
+    Status status = Status::OK();
     set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
         if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
@@ -287,7 +288,7 @@ Status PipelineXTask::execute(bool* eos) {
         *eos = _data_state == SourceState::FINISHED;
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            auto status = _sink->sink(_state, block, _data_state);
+            status = _sink->sink(_state, block, _data_state);
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
             }
@@ -298,7 +299,7 @@ Status PipelineXTask::execute(bool* eos) {
         }
     }
 
-    return Status::OK();
+    return status;
 }
 
 void PipelineXTask::finalize() {
@@ -312,16 +313,6 @@ void PipelineXTask::finalize() {
     _le_state_map.clear();
 }
 
-Status PipelineXTask::try_close(Status exec_status) {
-    if (_try_close_flag) {
-        return Status::OK();
-    }
-    _try_close_flag = true;
-    Status status1 = _sink->try_close(_state, exec_status);
-    Status status2 = _source->try_close(_state);
-    return status1.ok() ? status2 : status1;
-}
-
 Status PipelineXTask::close(Status exec_status) {
     int64_t close_ns = 0;
     Defer defer {[&]() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 164b00a8d28..96069cbbea2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -67,9 +67,6 @@ public:
 
     Status execute(bool* eos) override;
 
-    // Try to close this pipeline task. If there are still some resources need 
to be released after `try_close`,
-    // this task will enter the `PENDING_FINISH` state.
-    Status try_close(Status exec_status) override;
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status) override;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index d253c6a589a..1f8a714a623 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -246,13 +246,14 @@ void TaskScheduler::_do_work(size_t index) {
         bool canceled = fragment_ctx->is_canceled();
 
         auto state = task->get_state();
+        // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
+        // has to return false. The task is finished and need to close now.
         if (state == PipelineTaskState::PENDING_FINISH) {
             DCHECK(task->is_pipelineX() || !task->is_pending_finish())
                     << "must not pending close " << task->debug_string();
             Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
-            _try_close_task(task,
-                            canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
-                            exec_status);
+            _close_task(task, canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
+                        exec_status);
             continue;
         }
 
@@ -267,7 +268,7 @@ void TaskScheduler::_do_work(size_t index) {
             // errors to downstream through exchange. So, here we needn't 
send_report.
             // fragment_ctx->send_report(true);
             Status cancel_status = 
fragment_ctx->get_query_ctx()->exec_status();
-            _try_close_task(task, PipelineTaskState::CANCELED, cancel_status);
+            _close_task(task, PipelineTaskState::CANCELED, cancel_status);
             continue;
         }
 
@@ -289,7 +290,12 @@ void TaskScheduler::_do_work(size_t index) {
         }
 
         task->set_previous_core_id(index);
-        if (!status.ok()) {
+
+        if (status.is<ErrorCode::END_OF_FILE>()) {
+            // Sink operator finished, just close task now.
+            _close_task(task, PipelineTaskState::FINISHED, Status::OK());
+            continue;
+        } else if (!status.ok()) {
             task->set_eos_time();
             LOG(WARNING) << fmt::format(
                     "Pipeline task failed. query_id: {} reason: {}",
@@ -302,7 +308,7 @@ void TaskScheduler::_do_work(size_t index) {
 
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, 
status.msg());
-            _try_close_task(task, PipelineTaskState::CANCELED, status);
+            _close_task(task, PipelineTaskState::CANCELED, status);
             continue;
         }
         fragment_ctx->trigger_report_if_necessary();
@@ -316,10 +322,31 @@ void TaskScheduler::_do_work(size_t index) {
                     
PrintInstanceStandardInfo(task->query_context()->query_id(),
                                               
task->fragment_context()->get_fragment_instance_id()),
                     fragment_ctx->is_canceled());
-            _try_close_task(task,
-                            fragment_ctx->is_canceled() ? 
PipelineTaskState::CANCELED
-                                                        : 
PipelineTaskState::FINISHED,
-                            status);
+            if (task->is_pipelineX()) {
+                // is pending finish will add the task to dependency's 
blocking queue, and then the task will be
+                // added to running queue when dependency is ready.
+                if (task->is_pending_finish()) {
+                    // Only meet eos, should set task to PENDING_FINISH state
+                    task->set_state(PipelineTaskState::PENDING_FINISH);
+                    task->set_running(false);
+                } else {
+                    // Close the task directly?
+                    Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
+                    _close_task(
+                            task,
+                            canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
+                            exec_status);
+                }
+            } else {
+                // Only meet eos, should set task to PENDING_FINISH state
+                // pipeline is ok, because it will check is pending finish, 
and if it is ready, it will be invoked.
+                task->set_state(PipelineTaskState::PENDING_FINISH);
+                task->set_running(false);
+                // After the task is added to the block queue, it maybe run by 
another thread
+                // and the task maybe released in the other thread. And will 
core at
+                // task set running.
+                
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+            }
             continue;
         }
 
@@ -343,39 +370,20 @@ void TaskScheduler::_do_work(size_t index) {
     }
 }
 
-void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState 
state,
-                                    Status exec_status) {
+void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state, 
Status exec_status) {
     // close_a_pipeline may delete fragment context and will core in some defer
     // code, because the defer code will access fragment context it self.
     auto lock_for_context = task->fragment_context()->shared_from_this();
-    auto status = task->try_close(exec_status);
-    auto cancel = [&]() {
+    // is_pending_finish does not check status, so has to check status in 
close API.
+    // For example, in async writer, the writer may failed during dealing with 
eos_block
+    // but it does not return error status. Has to check the error status in 
close API.
+    // We have already refactor all source and sink api, the close API does 
not need waiting
+    // for pending finish now. So that could call close directly.
+    Status status = task->close(exec_status);
+    if (!status.ok() && state != PipelineTaskState::CANCELED) {
         task->query_context()->cancel(true, status.to_string(),
                                       Status::Cancelled(status.to_string()));
         state = PipelineTaskState::CANCELED;
-    };
-
-    auto try_close_failed = !status.ok() && state != 
PipelineTaskState::CANCELED;
-    if (try_close_failed) {
-        cancel();
-    }
-    if (!task->is_pipelineX() && task->is_pending_finish()) {
-        task->set_state(PipelineTaskState::PENDING_FINISH);
-        // After the task is added to the block queue, it maybe run by another 
thread
-        // and the task maybe released in the other thread. And will core at
-        // task set running.
-        static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
-        task->set_running(false);
-        return;
-    } else if (task->is_pending_finish()) {
-        task->set_state(PipelineTaskState::PENDING_FINISH);
-        task->set_running(false);
-        return;
-    }
-
-    status = task->close(exec_status);
-    if (!status.ok() && state != PipelineTaskState::CANCELED) {
-        cancel();
     }
     task->set_state(state);
     task->set_close_pipeline_time();
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index af12376c5b7..41ac8c0c098 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -103,8 +103,8 @@ private:
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 
     void _do_work(size_t index);
-    // after _try_close_task, task maybe destructed.
-    void _try_close_task(PipelineTask* task, PipelineTaskState state,
-                         Status exec_status = Status::OK());
+    // after _close_task, task maybe destructed.
+    void _close_task(PipelineTask* task, PipelineTaskState state,
+                     Status exec_status = Status::OK());
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/task_execution_context.h 
b/be/src/runtime/task_execution_context.h
index c876ed5cb0d..3ea54177a05 100644
--- a/be/src/runtime/task_execution_context.h
+++ b/be/src/runtime/task_execution_context.h
@@ -47,6 +47,7 @@ struct HasTaskExecutionCtx {
 
 public:
     inline TaskExecutionContextSPtr task_exec_ctx() const { return 
task_exec_ctx_.lock(); }
+    inline Weak weak_task_exec_ctx() const { return task_exec_ctx_; }
 
 private:
     Weak task_exec_ctx_;
diff --git a/be/src/vec/sink/async_writer_sink.h 
b/be/src/vec/sink/async_writer_sink.h
index 1260382d6fa..8105ff96573 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -105,7 +105,17 @@ public:
         if (_writer) {
             // For pipeline engine, the writer is always closed in async 
thread process_block
             if (state->enable_pipeline_exec()) {
-                RETURN_IF_ERROR(_writer->get_writer_status());
+                Status st = _writer->get_writer_status();
+                if (exec_status.ok()) {
+                    _writer->force_close(state->is_cancelled() ? 
Status::Cancelled("Cancelled")
+                                                               : 
Status::Cancelled("force close"));
+                } else {
+                    _writer->force_close(exec_status);
+                }
+                // If there is an error in process_block thread, then we 
should get the writer
+                // status before call force_close. For example, the thread may 
failed in commit
+                // transaction.
+                RETURN_IF_ERROR(st);
             } else {
                 RETURN_IF_ERROR(_writer->close(exec_status));
             }
@@ -113,13 +123,6 @@ public:
         return DataSink::close(state, exec_status);
     }
 
-    Status try_close(RuntimeState* state, Status exec_status) override {
-        if (state->is_cancelled() || !exec_status.ok()) {
-            _writer->force_close(!exec_status.ok() ? exec_status : 
Status::Cancelled("Cancelled"));
-        }
-        return Status::OK();
-    }
-
     [[nodiscard]] bool is_pending_finish() const override { return 
_writer->is_pending_finish(); }
 
 protected:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 6c4d10839e0..79ffebcc21e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -282,6 +282,7 @@ Status Channel<Parent>::close_internal(Status exec_status) {
                 _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
             }
         } else {
+            // Non pipeline engine will send an empty eos block
             status = send_remote_block((PBlock*)nullptr, true, exec_status);
         }
     }
@@ -632,17 +633,20 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
         // 1. calculate range
         // 2. dispatch rows to channel
     }
-    return Status::OK();
-}
 
-Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
-    SCOPED_TIMER(_exec_timer);
-    _serializer.reset_block();
+    // If eos == true, then this is the last block, should close the channel 
in this step.
     Status final_st = Status::OK();
-    for (int i = 0; i < _channels.size(); ++i) {
-        Status st = _channels[i]->close(state, exec_status);
-        if (!st.ok() && final_st.ok()) {
-            final_st = st;
+    // For non-pipeline engine, there maybe an block in serializer, should 
wait for
+    if (eos && _enable_pipeline_exec) {
+        _serializer.reset_block();
+        for (int i = 0; i < _channels.size(); ++i) {
+            // For non-pipeline engine, this API maybe hang to wait last rpc.
+            // For pipeline engine, it will add block to exchange sink buffer,
+            // and then come into pending finish state.
+            Status st = _channels[i]->close(state, Status::OK());
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
         }
     }
     return final_st;
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index ca020d9bab8..1224ded31ee 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -122,7 +122,7 @@ public:
     Status open(RuntimeState* state) override;
 
     Status send(RuntimeState* state, Block* block, bool eos = false) override;
-    Status try_close(RuntimeState* state, Status exec_status) override;
+
     Status close(RuntimeState* state, Status exec_status) override;
 
     RuntimeState* state() { return _state; }
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 35d94fe3c47..471d4518724 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -67,8 +67,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
         if (_dependency && !_data_queue_is_available() && !_is_finished()) {
             _dependency->block();
         }
-    } else if (_eos && _data_queue.empty()) {
-        status = Status::EndOfFile("Run out of sink data");
     }
 
     _cv.notify_one();
@@ -87,8 +85,20 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_block_from_queue() {
 }
 
 void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* 
profile) {
+    // Should set to false here, to
+    _writer_thread_closed = false;
+    // This is a async thread, should lock the task ctx, to make sure 
runtimestate and profile
+    // not deconstructed before the thread exit.
+    auto task_ctx = state->get_task_execution_context();
     
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
-            [this, state, profile]() { this->process_block(state, profile); 
}));
+            [this, state, profile, task_ctx]() {
+                auto task_lock = task_ctx.lock();
+                if (task_lock == nullptr) {
+                    _writer_thread_closed = true;
+                    return;
+                }
+                this->process_block(state, profile);
+            }));
 }
 
 void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* 
profile) {
@@ -131,16 +141,17 @@ void AsyncResultWriter::process_block(RuntimeState* 
state, RuntimeProfile* profi
     // There is a unique ptr err_msg in Status, if it is modified, the unique 
ptr
     // maybe released. And it will core because use after free.
     std::lock_guard l(_m);
+    // eos only means the last block is input to the queue and there is no 
more block to be added,
+    // it is not sure that the block is written to stream.
     if (_writer_status.ok() && _eos) {
         _writer_status = finish(state);
     }
 
+    Status close_st = close(_writer_status);
+    // If it is already failed before, then not update the write status so 
that we could get
+    // the real reason.
     if (_writer_status.ok()) {
-        _writer_status = close(_writer_status);
-    } else {
-        // If it is already failed before, then not update the write status so 
that we could get
-        // the real reason.
-        static_cast<void>(close(_writer_status));
+        _writer_status = close_st;
     }
     _writer_thread_closed = true;
     if (_finish_dependency) {
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 5fbcca98af3..8ed39aeb795 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -104,7 +104,9 @@ private:
     std::deque<std::unique_ptr<Block>> _data_queue;
     Status _writer_status = Status::OK();
     bool _eos = false;
-    bool _writer_thread_closed = false;
+    // The writer is not started at the beginning. If prepare failed but not 
open, the the writer
+    // is not started, so should not pending finish on it.
+    bool _writer_thread_closed = true;
 
     // Used by pipelineX
     pipeline::AsyncWriterDependency* _dependency;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to