This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b94c4ed7 [pipeline](refactor) refine pipeline fragment context 
(#23478)
29b94c4ed7 is described below

commit 29b94c4ed75eaf4d094e640c9b4e8b0de31750ee
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 28 15:55:02 2023 +0800

    [pipeline](refactor) refine pipeline fragment context (#23478)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 13 +++-----
 be/src/pipeline/exec/exchange_sink_buffer.h        |  6 ++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 28 ++++++-----------
 be/src/pipeline/exec/exchange_sink_operator.h      | 20 +++---------
 be/src/pipeline/pipeline_fragment_context.cpp      | 35 +++++----------------
 be/src/pipeline/pipeline_fragment_context.h        |  9 ++----
 be/src/pipeline/pipeline_task.h                    |  2 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 36 +++++++---------------
 be/src/pipeline/task_scheduler.cpp                 | 15 +++++----
 be/src/runtime/query_context.h                     | 33 +++++++++++++++++---
 be/src/runtime/runtime_state.cpp                   |  2 +-
 be/src/util/proto_util.h                           |  5 ++-
 be/src/vec/sink/vdata_stream_sender.cpp            |  3 +-
 13 files changed, 84 insertions(+), 123 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 48050c83cc..118b2a31b5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -48,8 +48,7 @@ namespace doris::pipeline {
 
 template <typename Parent>
 ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id,
-                                               int send_id, int be_number,
-                                               PipelineFragmentContext* 
context)
+                                               int send_id, int be_number, 
QueryContext* context)
         : _is_finishing(false),
           _query_id(query_id),
           _dest_node_id(dest_node_id),
@@ -84,7 +83,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
     // 1 make ExchangeSinkBuffer support try close which calls 
brpc::StartCancel
     // 2 make BlockScheduler calls tryclose when query is cancel
     DCHECK(_context != nullptr);
-    bool need_cancel = _context->is_canceled();
+    bool need_cancel = _context->is_cancelled();
 
     for (auto& pair : _instance_to_package_queue_mutex) {
         std::unique_lock<std::mutex> lock(*(pair.second));
@@ -235,8 +234,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             if (enable_http_send_block(*brpc_request)) {
-                
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
-                                                    *brpc_request,
+                RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), 
closure, *brpc_request,
                                                     
request.channel->_brpc_dest_addr));
             } else {
                 transmit_block(*request.channel->_brpc_stub, closure, 
*brpc_request);
@@ -288,8 +286,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             if (enable_http_send_block(*brpc_request)) {
-                
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
-                                                    *brpc_request,
+                RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), 
closure, *brpc_request,
                                                     
request.channel->_brpc_dest_addr));
             } else {
                 transmit_block(*request.channel->_brpc_stub, closure, 
*brpc_request);
@@ -326,7 +323,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string& 
err) {
     _is_finishing = true;
-    _context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, err);
+    _context->cancel(true, err, Status::Cancelled(err));
     _ended(id);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d9a0d316c6..f6c702b417 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -99,8 +99,6 @@ struct BroadcastTransmitInfo {
     bool eos;
 };
 
-class PipelineFragmentContext;
-
 template <typename T>
 class SelfDeleteClosure : public google::protobuf::Closure {
 public:
@@ -167,7 +165,7 @@ struct ExchangeRpcContext {
 template <typename Parent>
 class ExchangeSinkBuffer {
 public:
-    ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, 
PipelineFragmentContext*);
+    ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, QueryContext*);
     ~ExchangeSinkBuffer();
     void register_sink(TUniqueId);
 
@@ -207,7 +205,7 @@ private:
     int _sender_id;
     int _be_number;
     std::atomic<int64_t> _rpc_count = 0;
-    PipelineFragmentContext* _context;
+    QueryContext* _context;
 
     Status _send_rpc(InstanceLoId);
     // must hold the _instance_to_package_queue_mutex[id] mutex to opera
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index ba04caae9b..80954ee8a1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -26,7 +26,6 @@
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
 #include "pipeline/exec/operator.h"
-#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
 #include "vec/columns/column_const.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/sink/vdata_stream_sender.h"
@@ -38,21 +37,16 @@ class DataSink;
 namespace doris::pipeline {
 
 ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* 
sink,
-                                                         
PipelineFragmentContext* context,
                                                          int mult_cast_id)
-        : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink),
-          _context(context),
-          _mult_cast_id(mult_cast_id) {}
+        : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), 
_mult_cast_id(mult_cast_id) {}
 
 OperatorPtr ExchangeSinkOperatorBuilder::build_operator() {
-    return std::make_shared<ExchangeSinkOperator>(this, _sink, _context, 
_mult_cast_id);
+    return std::make_shared<ExchangeSinkOperator>(this, _sink, _mult_cast_id);
 }
 
 ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* 
operator_builder, DataSink* sink,
-                                           PipelineFragmentContext* context, 
int mult_cast_id)
-        : DataSinkOperator(operator_builder, sink),
-          _context(context),
-          _mult_cast_id(mult_cast_id) {}
+                                           int mult_cast_id)
+        : DataSinkOperator(operator_builder, sink), 
_mult_cast_id(mult_cast_id) {}
 
 Status ExchangeSinkOperator::init(const TDataSink& tsink) {
     // -1 means not the mult cast stream sender
@@ -70,7 +64,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
-            id, _dest_node_id, _sink->_sender_id, _state->be_number(), 
_context);
+            id, _dest_node_id, _sink->_sender_id, _state->be_number(), 
state->get_query_ctx());
 
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->registe_channels(_sink_buffer.get());
@@ -153,7 +147,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
-            id, p._dest_node_id, _sender_id, _state->be_number(), p._context);
+            id, p._dest_node_id, _sender_id, _state->be_number(), 
state->get_query_ctx());
 
     register_channels(_sink_buffer.get());
 
@@ -189,9 +183,8 @@ segment_v2::CompressionTypePB& 
ExchangeSinkLocalState::compression_type() {
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
         const TDataStreamSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations,
-        bool send_query_statistics_with_every_batch, PipelineXFragmentContext* 
context)
+        bool send_query_statistics_with_every_batch)
         : DataSinkOperatorX(sink.dest_node_id),
-          _context(context),
           _pool(pool),
           _row_desc(row_desc),
           _part_type(sink.output_partition.type),
@@ -211,9 +204,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId 
dest_node_id,
         const std::vector<TPlanFragmentDestination>& destinations,
-        bool send_query_statistics_with_every_batch, PipelineXFragmentContext* 
context)
+        bool send_query_statistics_with_every_batch)
         : DataSinkOperatorX(dest_node_id),
-          _context(context),
           _pool(pool),
           _row_desc(row_desc),
           _part_type(TPartitionType::UNPARTITIONED),
@@ -225,10 +217,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
 }
 
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(ObjectPool* pool, const 
RowDescriptor& row_desc,
-                                             bool 
send_query_statistics_with_every_batch,
-                                             PipelineXFragmentContext* context)
+                                             bool 
send_query_statistics_with_every_batch)
         : DataSinkOperatorX(0),
-          _context(context),
           _pool(pool),
           _row_desc(row_desc),
           
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index c9f9c2528d..b1f3424394 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -32,27 +32,22 @@ class RuntimeState;
 class TDataSink;
 
 namespace pipeline {
-class PipelineFragmentContext;
-class PipelineXFragmentContext;
 
 class ExchangeSinkOperatorBuilder final
         : public DataSinkOperatorBuilder<vectorized::VDataStreamSender> {
 public:
-    ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, 
PipelineFragmentContext* context,
-                                int mult_cast_id = -1);
+    ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, int mult_cast_id = 
-1);
 
     OperatorPtr build_operator() override;
 
 private:
-    PipelineFragmentContext* _context;
     int _mult_cast_id = -1;
 };
 
 // Now local exchange is not supported since VDataStreamRecvr is considered as 
a pipeline broker.
 class ExchangeSinkOperator final : public 
DataSinkOperator<ExchangeSinkOperatorBuilder> {
 public:
-    ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
-                         PipelineFragmentContext* context, int mult_cast_id);
+    ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink, int mult_cast_id);
     Status init(const TDataSink& tsink) override;
 
     Status prepare(RuntimeState* state) override;
@@ -67,7 +62,6 @@ private:
     std::unique_ptr<ExchangeSinkBuffer<vectorized::VDataStreamSender>> 
_sink_buffer;
     int _dest_node_id = -1;
     RuntimeState* _state = nullptr;
-    PipelineFragmentContext* _context;
     int _mult_cast_id = -1;
 };
 
@@ -159,15 +153,12 @@ public:
     ExchangeSinkOperatorX(RuntimeState* state, ObjectPool* pool, const 
RowDescriptor& row_desc,
                           const TDataStreamSink& sink,
                           const std::vector<TPlanFragmentDestination>& 
destinations,
-                          bool send_query_statistics_with_every_batch,
-                          PipelineXFragmentContext* context);
+                          bool send_query_statistics_with_every_batch);
     ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, 
PlanNodeId dest_node_id,
                           const std::vector<TPlanFragmentDestination>& 
destinations,
-                          bool send_query_statistics_with_every_batch,
-                          PipelineXFragmentContext* context);
+                          bool send_query_statistics_with_every_batch);
     ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
-                          bool send_query_statistics_with_every_batch,
-                          PipelineXFragmentContext* context);
+                          bool send_query_statistics_with_every_batch);
     Status init(const TDataSink& tsink) override;
 
     RuntimeState* state() { return _state; }
@@ -205,7 +196,6 @@ private:
                             const uint64_t* channel_ids, int rows, 
vectorized::Block* block,
                             bool eos);
     RuntimeState* _state = nullptr;
-    PipelineXFragmentContext* _context;
 
     ObjectPool* _pool;
     const RowDescriptor& _row_desc;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index c2a5080eed..c8f73cbaa3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -123,7 +123,6 @@ PipelineFragmentContext::PipelineFragmentContext(
           _fragment_id(fragment_id),
           _backend_num(backend_num),
           _exec_env(exec_env),
-          _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
           _query_ctx(std::move(query_ctx)),
           _call_back(call_back),
           _report_thread_active(false),
@@ -137,48 +136,28 @@ PipelineFragmentContext::PipelineFragmentContext(
 }
 
 PipelineFragmentContext::~PipelineFragmentContext() {
+    auto st = _query_ctx->exec_status();
     if (_runtime_state != nullptr) {
         // The memory released by the query end is recorded in the query mem 
tracker, main memory in _runtime_state.
         SCOPED_ATTACH_TASK(_runtime_state.get());
-        _call_back(_runtime_state.get(), &_exec_status);
+        _call_back(_runtime_state.get(), &st);
         _runtime_state.reset();
     } else {
-        _call_back(_runtime_state.get(), &_exec_status);
+        _call_back(_runtime_state.get(), &st);
     }
     DCHECK(!_report_thread_active);
 }
 
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
-    if (!_runtime_state->is_cancelled()) {
-        std::lock_guard<std::mutex> l(_status_lock);
-        if (_runtime_state->is_cancelled()) {
-            return;
-        }
-        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
-            _exec_status = Status::Cancelled(msg);
-        }
-        _runtime_state->set_is_cancelled(true, msg);
-
+    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
         LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
-
-        // Print detail informations below when you debugging here.
-        //
-        // for (auto& task : _tasks) {
-        //     LOG(WARNING) << task->debug_string();
-        // }
-
-        _runtime_state->set_process_status(_exec_status);
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
         auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
         if (stream_load_ctx != nullptr) {
             stream_load_ctx->pipe->cancel(msg);
         }
-        _cancel_reason = reason;
-        _cancel_msg = msg;
-        // To notify wait_for_start()
-        _query_ctx->set_ready_to_execute(true);
 
         // must close stream_mgr to avoid dead lock in Exchange Node
         _exec_env->vstream_mgr()->cancel(_fragment_instance_id);
@@ -745,7 +724,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK: {
         sink_ = 
std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id,
-                                                              _sink.get(), 
this);
+                                                              _sink.get());
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -811,7 +790,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
 
             // 3. create and set sink operator of data stream sender for new 
pipeline
             OperatorBuilderPtr sink_op_builder = 
std::make_shared<ExchangeSinkOperatorBuilder>(
-                    next_operator_builder_id(), 
_multi_cast_stream_sink_senders[i].get(), this, i);
+                    next_operator_builder_id(), 
_multi_cast_stream_sink_senders[i].get(), i);
             new_pipeline->set_sink(sink_op_builder);
 
             // 4. init and prepare the data_stream_sender of diff exchange
@@ -849,7 +828,7 @@ void PipelineFragmentContext::send_report(bool done) {
     Status exec_status = Status::OK();
     {
         std::lock_guard<std::mutex> l(_status_lock);
-        exec_status = _exec_status;
+        exec_status = _query_ctx->exec_status();
     }
 
     // If plan is done successfully, but _is_report_success is false,
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 6deb82156f..de3451d11a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -116,10 +116,10 @@ public:
 
     Status update_status(Status status) {
         std::lock_guard<std::mutex> l(_status_lock);
-        if (!status.ok() && _exec_status.ok()) {
-            _exec_status = status;
+        if (!status.ok() && _query_ctx->exec_status().ok()) {
+            _query_ctx->set_exec_status(status);
         }
-        return _exec_status;
+        return _query_ctx->exec_status();
     }
 
     taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
@@ -149,9 +149,6 @@ protected:
     bool _submitted = false;
 
     std::mutex _status_lock;
-    Status _exec_status;
-    PPlanFragmentCancelReason _cancel_reason;
-    std::string _cancel_msg;
 
     Pipelines _pipelines;
     PipelineId _next_pipeline_id = 0;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index fc66ca54f1..8b407bbaeb 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -388,4 +388,4 @@ private:
     OperatorPtr _root;
     OperatorPtr _sink;
 };
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1a569202fb..830446abf8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -97,13 +97,13 @@ PipelineXFragmentContext::PipelineXFragmentContext(
                                   call_back, report_status_cb) {}
 
 PipelineXFragmentContext::~PipelineXFragmentContext() {
+    auto st = _query_ctx->exec_status();
     if (!_runtime_states.empty()) {
         // The memory released by the query end is recorded in the query mem 
tracker, main memory in _runtime_state.
         SCOPED_ATTACH_TASK(_runtime_state.get());
-        FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &_exec_status);
-                               runtime_state.reset();)
+        FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); 
runtime_state.reset();)
     } else {
-        _call_back(nullptr, &_exec_status);
+        _call_back(nullptr, &st);
     }
     _runtime_state.reset();
     DCHECK(!_report_thread_active);
@@ -111,35 +111,21 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
 
 void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                       const std::string& msg) {
-    if (!_runtime_state->is_cancelled()) {
-        std::lock_guard<std::mutex> l(_status_lock);
-        if (_runtime_state->is_cancelled()) {
-            return;
-        }
-        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
-            _exec_status = Status::Cancelled(msg);
-        }
-
-        FOR_EACH_RUNTIME_STATE(
-                runtime_state->set_is_cancelled(true, msg);
-                runtime_state->set_process_status(_exec_status);
-                
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
-
+    if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
         LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
-
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
         auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
         if (stream_load_ctx != nullptr) {
             stream_load_ctx->pipe->cancel(msg);
         }
-        _cancel_reason = reason;
-        _cancel_msg = msg;
-        // To notify wait_for_start()
-        _query_ctx->set_ready_to_execute(true);
 
         // must close stream_mgr to avoid dead lock in Exchange Node
-        //
+        FOR_EACH_RUNTIME_STATE(
+                runtime_state->set_is_cancelled(true, msg);
+                runtime_state->set_process_status(_query_ctx->exec_status());
+                
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
+
         // Cancel the result queue manager used by spark doris connector
         // TODO pipeline incomp
         // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
@@ -244,7 +230,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
                         : false;
         _sink.reset(new ExchangeSinkOperatorX(state, pool, row_desc, 
thrift_sink.stream_sink,
                                               params.destinations,
-                                              
send_query_statistics_with_every_batch, this));
+                                              
send_query_statistics_with_every_batch));
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -665,7 +651,7 @@ void PipelineXFragmentContext::send_report(bool done) {
     Status exec_status = Status::OK();
     {
         std::lock_guard<std::mutex> l(_status_lock);
-        exec_status = _exec_status;
+        exec_status = _query_ctx->exec_status();
     }
 
     // If plan is done successfully, but _is_report_success is false,
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0be333479b..11d20ac7a9 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,15 +118,14 @@ void BlockedTaskScheduler::_schedule() {
                     _make_task_run(local_blocked_tasks, iter, ready_tasks,
                                    PipelineTaskState::PENDING_FINISH);
                 }
-            } else if (task->fragment_context()->is_canceled()) {
+            } else if (task->query_context()->is_cancelled()) {
                 _make_task_run(local_blocked_tasks, iter, ready_tasks);
             } else if (task->query_context()->is_timeout(now)) {
                 LOG(WARNING) << "Timeout, query_id=" << 
print_id(task->query_context()->query_id())
-                             << ", instance_id="
-                             << 
print_id(task->fragment_context()->get_fragment_instance_id())
+                             << ", instance_id=" << 
print_id(task->instance_id())
                              << ", task info: " << task->debug_string();
 
-                
task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);
+                task->query_context()->cancel(true, "", Status::Cancelled(""));
                 _make_task_run(local_blocked_tasks, iter, ready_tasks);
             } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
                 if (task->has_dependency()) {
@@ -321,8 +320,8 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state)
     if (!status.ok() && state != PipelineTaskState::CANCELED) {
         // Call `close` if `try_close` failed to make sure allocated resources 
are released
         task->close();
-        
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
-                                         status.to_string());
+        task->query_context()->cancel(true, status.to_string(),
+                                      Status::Cancelled(status.to_string()));
         state = PipelineTaskState::CANCELED;
     } else if (task->is_pending_finish()) {
         task->set_state(PipelineTaskState::PENDING_FINISH);
@@ -331,8 +330,8 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state)
     } else {
         status = task->close();
         if (!status.ok() && state != PipelineTaskState::CANCELED) {
-            
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
-                                             status.to_string());
+            task->query_context()->cancel(true, status.to_string(),
+                                          
Status::Cancelled(status.to_string()));
             state = PipelineTaskState::CANCELED;
         }
         DCHECK(!task->is_pending_finish()) << task->debug_string();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dbb8c9b1c0..105ac80b81 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -133,11 +133,31 @@ public:
     }
 
     [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
-    void set_is_cancelled(bool v) {
+    bool cancel(bool v, std::string msg, Status new_status) {
+        if (_is_cancelled) {
+            return false;
+        }
         _is_cancelled.store(v);
-        // Create a error status, so that we could print error stack, and
-        // we could know which path call cancel.
-        LOG(INFO) << "task is cancelled, st = " << 
Status::Error<ErrorCode::CANCELLED>("");
+
+        set_ready_to_execute(true);
+        set_exec_status(new_status);
+        return true;
+    }
+
+    void set_exec_status(Status new_status) {
+        if (new_status.ok()) {
+            return;
+        }
+        std::lock_guard<std::mutex> l(_exec_status_lock);
+        if (!_exec_status.ok()) {
+            return;
+        }
+        _exec_status = new_status;
+    }
+
+    [[nodiscard]] Status exec_status() {
+        std::lock_guard<std::mutex> l(_exec_status_lock);
+        return _exec_status;
     }
 
     void set_ready_to_execute_only() {
@@ -254,6 +274,11 @@ private:
     taskgroup::TaskGroupPtr _task_group;
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
     const TQueryOptions _query_options;
+
+    std::mutex _exec_status_lock;
+    // All pipeline tasks use the same query context to report status. So we 
need a `_exec_status`
+    // to report the real message if failed.
+    Status _exec_status = Status::OK();
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 3efce72ea1..944d1e42a3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -305,7 +305,7 @@ void 
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
 }
 
 bool RuntimeState::is_cancelled() const {
-    return _is_cancelled.load();
+    return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
 }
 
 Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 3c583b867c..b11180ab29 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -68,7 +68,7 @@ void transmit_block(PBackendService_Stub& stub, Closure* 
closure,
 }
 
 template <typename Closure>
-Status transmit_block_http(RuntimeState* state, Closure* closure, 
PTransmitDataParams& params,
+Status transmit_block_http(ExecEnv* exec_env, Closure* closure, 
PTransmitDataParams& params,
                            TNetworkAddress brpc_dest_addr) {
     RETURN_IF_ERROR(request_embed_attachment_contain_block(&params, closure));
 
@@ -76,8 +76,7 @@ Status transmit_block_http(RuntimeState* state, Closure* 
closure, PTransmitDataP
     std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, 
brpc_dest_addr.port);
 
     std::shared_ptr<PBackendService_Stub> brpc_http_stub =
-            
state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
-                                                                               
      "http");
+            
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, 
"http");
     closure->cntl.http_request().uri() = brpc_url + 
"/PInternalServiceImpl/transmit_block_by_http";
     closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
     closure->cntl.http_request().set_content_type("application/json");
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index fe86eefa4e..4c547c7840 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -192,7 +192,8 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) 
{
     {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
         if (enable_http_send_block(_brpc_request, 
_parent->transfer_large_data_by_brpc())) {
-            RETURN_IF_ERROR(transmit_block_http(_state, _closure, 
_brpc_request, _brpc_dest_addr));
+            RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, 
_brpc_request,
+                                                _brpc_dest_addr));
         } else {
             transmit_block(*_brpc_stub, _closure, _brpc_request);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to