This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 29b94c4ed7 [pipeline](refactor) refine pipeline fragment context
(#23478)
29b94c4ed7 is described below
commit 29b94c4ed75eaf4d094e640c9b4e8b0de31750ee
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 28 15:55:02 2023 +0800
[pipeline](refactor) refine pipeline fragment context (#23478)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 13 +++-----
be/src/pipeline/exec/exchange_sink_buffer.h | 6 ++--
be/src/pipeline/exec/exchange_sink_operator.cpp | 28 ++++++-----------
be/src/pipeline/exec/exchange_sink_operator.h | 20 +++---------
be/src/pipeline/pipeline_fragment_context.cpp | 35 +++++----------------
be/src/pipeline/pipeline_fragment_context.h | 9 ++----
be/src/pipeline/pipeline_task.h | 2 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 36 +++++++---------------
be/src/pipeline/task_scheduler.cpp | 15 +++++----
be/src/runtime/query_context.h | 33 +++++++++++++++++---
be/src/runtime/runtime_state.cpp | 2 +-
be/src/util/proto_util.h | 5 ++-
be/src/vec/sink/vdata_stream_sender.cpp | 3 +-
13 files changed, 84 insertions(+), 123 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 48050c83cc..118b2a31b5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -48,8 +48,7 @@ namespace doris::pipeline {
template <typename Parent>
ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id,
- int send_id, int be_number,
- PipelineFragmentContext*
context)
+ int send_id, int be_number,
QueryContext* context)
: _is_finishing(false),
_query_id(query_id),
_dest_node_id(dest_node_id),
@@ -84,7 +83,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
// 1 make ExchangeSinkBuffer support try close which calls
brpc::StartCancel
// 2 make BlockScheduler calls tryclose when query is cancel
DCHECK(_context != nullptr);
- bool need_cancel = _context->is_canceled();
+ bool need_cancel = _context->is_cancelled();
for (auto& pair : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*(pair.second));
@@ -235,8 +234,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (enable_http_send_block(*brpc_request)) {
-
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
- *brpc_request,
+ RETURN_IF_ERROR(transmit_block_http(_context->exec_env(),
closure, *brpc_request,
request.channel->_brpc_dest_addr));
} else {
transmit_block(*request.channel->_brpc_stub, closure,
*brpc_request);
@@ -288,8 +286,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (enable_http_send_block(*brpc_request)) {
-
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
- *brpc_request,
+ RETURN_IF_ERROR(transmit_block_http(_context->exec_env(),
closure, *brpc_request,
request.channel->_brpc_dest_addr));
} else {
transmit_block(*request.channel->_brpc_stub, closure,
*brpc_request);
@@ -326,7 +323,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
template <typename Parent>
void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string&
err) {
_is_finishing = true;
- _context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, err);
+ _context->cancel(true, err, Status::Cancelled(err));
_ended(id);
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d9a0d316c6..f6c702b417 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -99,8 +99,6 @@ struct BroadcastTransmitInfo {
bool eos;
};
-class PipelineFragmentContext;
-
template <typename T>
class SelfDeleteClosure : public google::protobuf::Closure {
public:
@@ -167,7 +165,7 @@ struct ExchangeRpcContext {
template <typename Parent>
class ExchangeSinkBuffer {
public:
- ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int,
PipelineFragmentContext*);
+ ExchangeSinkBuffer(PUniqueId, int, PlanNodeId, int, QueryContext*);
~ExchangeSinkBuffer();
void register_sink(TUniqueId);
@@ -207,7 +205,7 @@ private:
int _sender_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
- PipelineFragmentContext* _context;
+ QueryContext* _context;
Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index ba04caae9b..80954ee8a1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -26,7 +26,6 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "pipeline/exec/operator.h"
-#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/vdata_stream_sender.h"
@@ -38,21 +37,16 @@ class DataSink;
namespace doris::pipeline {
ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink*
sink,
-
PipelineFragmentContext* context,
int mult_cast_id)
- : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink),
- _context(context),
- _mult_cast_id(mult_cast_id) {}
+ : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink),
_mult_cast_id(mult_cast_id) {}
OperatorPtr ExchangeSinkOperatorBuilder::build_operator() {
- return std::make_shared<ExchangeSinkOperator>(this, _sink, _context,
_mult_cast_id);
+ return std::make_shared<ExchangeSinkOperator>(this, _sink, _mult_cast_id);
}
ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase*
operator_builder, DataSink* sink,
- PipelineFragmentContext* context,
int mult_cast_id)
- : DataSinkOperator(operator_builder, sink),
- _context(context),
- _mult_cast_id(mult_cast_id) {}
+ int mult_cast_id)
+ : DataSinkOperator(operator_builder, sink),
_mult_cast_id(mult_cast_id) {}
Status ExchangeSinkOperator::init(const TDataSink& tsink) {
// -1 means not the mult cast stream sender
@@ -70,7 +64,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<vectorized::VDataStreamSender>>(
- id, _dest_node_id, _sink->_sender_id, _state->be_number(),
_context);
+ id, _dest_node_id, _sink->_sender_id, _state->be_number(),
state->get_query_ctx());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
@@ -153,7 +147,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
- id, p._dest_node_id, _sender_id, _state->be_number(), p._context);
+ id, p._dest_node_id, _sender_id, _state->be_number(),
state->get_query_ctx());
register_channels(_sink_buffer.get());
@@ -189,9 +183,8 @@ segment_v2::CompressionTypePB&
ExchangeSinkLocalState::compression_type() {
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
const TDataStreamSink& sink, const
std::vector<TPlanFragmentDestination>& destinations,
- bool send_query_statistics_with_every_batch, PipelineXFragmentContext*
context)
+ bool send_query_statistics_with_every_batch)
: DataSinkOperatorX(sink.dest_node_id),
- _context(context),
_pool(pool),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
@@ -211,9 +204,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId
dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
- bool send_query_statistics_with_every_batch, PipelineXFragmentContext*
context)
+ bool send_query_statistics_with_every_batch)
: DataSinkOperatorX(dest_node_id),
- _context(context),
_pool(pool),
_row_desc(row_desc),
_part_type(TPartitionType::UNPARTITIONED),
@@ -225,10 +217,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(ObjectPool* pool, const
RowDescriptor& row_desc,
- bool
send_query_statistics_with_every_batch,
- PipelineXFragmentContext* context)
+ bool
send_query_statistics_with_every_batch)
: DataSinkOperatorX(0),
- _context(context),
_pool(pool),
_row_desc(row_desc),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index c9f9c2528d..b1f3424394 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -32,27 +32,22 @@ class RuntimeState;
class TDataSink;
namespace pipeline {
-class PipelineFragmentContext;
-class PipelineXFragmentContext;
class ExchangeSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::VDataStreamSender> {
public:
- ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink,
PipelineFragmentContext* context,
- int mult_cast_id = -1);
+ ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, int mult_cast_id =
-1);
OperatorPtr build_operator() override;
private:
- PipelineFragmentContext* _context;
int _mult_cast_id = -1;
};
// Now local exchange is not supported since VDataStreamRecvr is considered as
a pipeline broker.
class ExchangeSinkOperator final : public
DataSinkOperator<ExchangeSinkOperatorBuilder> {
public:
- ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
- PipelineFragmentContext* context, int mult_cast_id);
+ ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink*
sink, int mult_cast_id);
Status init(const TDataSink& tsink) override;
Status prepare(RuntimeState* state) override;
@@ -67,7 +62,6 @@ private:
std::unique_ptr<ExchangeSinkBuffer<vectorized::VDataStreamSender>>
_sink_buffer;
int _dest_node_id = -1;
RuntimeState* _state = nullptr;
- PipelineFragmentContext* _context;
int _mult_cast_id = -1;
};
@@ -159,15 +153,12 @@ public:
ExchangeSinkOperatorX(RuntimeState* state, ObjectPool* pool, const
RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
- bool send_query_statistics_with_every_batch,
- PipelineXFragmentContext* context);
+ bool send_query_statistics_with_every_batch);
ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>&
destinations,
- bool send_query_statistics_with_every_batch,
- PipelineXFragmentContext* context);
+ bool send_query_statistics_with_every_batch);
ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
- bool send_query_statistics_with_every_batch,
- PipelineXFragmentContext* context);
+ bool send_query_statistics_with_every_batch);
Status init(const TDataSink& tsink) override;
RuntimeState* state() { return _state; }
@@ -205,7 +196,6 @@ private:
const uint64_t* channel_ids, int rows,
vectorized::Block* block,
bool eos);
RuntimeState* _state = nullptr;
- PipelineXFragmentContext* _context;
ObjectPool* _pool;
const RowDescriptor& _row_desc;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index c2a5080eed..c8f73cbaa3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -123,7 +123,6 @@ PipelineFragmentContext::PipelineFragmentContext(
_fragment_id(fragment_id),
_backend_num(backend_num),
_exec_env(exec_env),
- _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_report_thread_active(false),
@@ -137,48 +136,28 @@ PipelineFragmentContext::PipelineFragmentContext(
}
PipelineFragmentContext::~PipelineFragmentContext() {
+ auto st = _query_ctx->exec_status();
if (_runtime_state != nullptr) {
// The memory released by the query end is recorded in the query mem
tracker, main memory in _runtime_state.
SCOPED_ATTACH_TASK(_runtime_state.get());
- _call_back(_runtime_state.get(), &_exec_status);
+ _call_back(_runtime_state.get(), &st);
_runtime_state.reset();
} else {
- _call_back(_runtime_state.get(), &_exec_status);
+ _call_back(_runtime_state.get(), &st);
}
DCHECK(!_report_thread_active);
}
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- if (!_runtime_state->is_cancelled()) {
- std::lock_guard<std::mutex> l(_status_lock);
- if (_runtime_state->is_cancelled()) {
- return;
- }
- if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
- _exec_status = Status::Cancelled(msg);
- }
- _runtime_state->set_is_cancelled(true, msg);
-
+ if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
-
- // Print detail informations below when you debugging here.
- //
- // for (auto& task : _tasks) {
- // LOG(WARNING) << task->debug_string();
- // }
-
- _runtime_state->set_process_status(_exec_status);
// Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(msg);
}
- _cancel_reason = reason;
- _cancel_msg = msg;
- // To notify wait_for_start()
- _query_ctx->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
_exec_env->vstream_mgr()->cancel(_fragment_instance_id);
@@ -745,7 +724,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
switch (thrift_sink.type) {
case TDataSinkType::DATA_STREAM_SINK: {
sink_ =
std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id,
- _sink.get(),
this);
+ _sink.get());
break;
}
case TDataSinkType::RESULT_SINK: {
@@ -811,7 +790,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
// 3. create and set sink operator of data stream sender for new
pipeline
OperatorBuilderPtr sink_op_builder =
std::make_shared<ExchangeSinkOperatorBuilder>(
- next_operator_builder_id(),
_multi_cast_stream_sink_senders[i].get(), this, i);
+ next_operator_builder_id(),
_multi_cast_stream_sink_senders[i].get(), i);
new_pipeline->set_sink(sink_op_builder);
// 4. init and prepare the data_stream_sender of diff exchange
@@ -849,7 +828,7 @@ void PipelineFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
- exec_status = _exec_status;
+ exec_status = _query_ctx->exec_status();
}
// If plan is done successfully, but _is_report_success is false,
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 6deb82156f..de3451d11a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -116,10 +116,10 @@ public:
Status update_status(Status status) {
std::lock_guard<std::mutex> l(_status_lock);
- if (!status.ok() && _exec_status.ok()) {
- _exec_status = status;
+ if (!status.ok() && _query_ctx->exec_status().ok()) {
+ _query_ctx->set_exec_status(status);
}
- return _exec_status;
+ return _query_ctx->exec_status();
}
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
@@ -149,9 +149,6 @@ protected:
bool _submitted = false;
std::mutex _status_lock;
- Status _exec_status;
- PPlanFragmentCancelReason _cancel_reason;
- std::string _cancel_msg;
Pipelines _pipelines;
PipelineId _next_pipeline_id = 0;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index fc66ca54f1..8b407bbaeb 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -388,4 +388,4 @@ private:
OperatorPtr _root;
OperatorPtr _sink;
};
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1a569202fb..830446abf8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -97,13 +97,13 @@ PipelineXFragmentContext::PipelineXFragmentContext(
call_back, report_status_cb) {}
PipelineXFragmentContext::~PipelineXFragmentContext() {
+ auto st = _query_ctx->exec_status();
if (!_runtime_states.empty()) {
// The memory released by the query end is recorded in the query mem
tracker, main memory in _runtime_state.
SCOPED_ATTACH_TASK(_runtime_state.get());
- FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &_exec_status);
- runtime_state.reset();)
+ FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st);
runtime_state.reset();)
} else {
- _call_back(nullptr, &_exec_status);
+ _call_back(nullptr, &st);
}
_runtime_state.reset();
DCHECK(!_report_thread_active);
@@ -111,35 +111,21 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- if (!_runtime_state->is_cancelled()) {
- std::lock_guard<std::mutex> l(_status_lock);
- if (_runtime_state->is_cancelled()) {
- return;
- }
- if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
- _exec_status = Status::Cancelled(msg);
- }
-
- FOR_EACH_RUNTIME_STATE(
- runtime_state->set_is_cancelled(true, msg);
- runtime_state->set_process_status(_exec_status);
-
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
-
+ if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
-
// Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(msg);
}
- _cancel_reason = reason;
- _cancel_msg = msg;
- // To notify wait_for_start()
- _query_ctx->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
- //
+ FOR_EACH_RUNTIME_STATE(
+ runtime_state->set_is_cancelled(true, msg);
+ runtime_state->set_process_status(_query_ctx->exec_status());
+
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
+
// Cancel the result queue manager used by spark doris connector
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
@@ -244,7 +230,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
: false;
_sink.reset(new ExchangeSinkOperatorX(state, pool, row_desc,
thrift_sink.stream_sink,
params.destinations,
-
send_query_statistics_with_every_batch, this));
+
send_query_statistics_with_every_batch));
break;
}
case TDataSinkType::RESULT_SINK: {
@@ -665,7 +651,7 @@ void PipelineXFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
- exec_status = _exec_status;
+ exec_status = _query_ctx->exec_status();
}
// If plan is done successfully, but _is_report_success is false,
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 0be333479b..11d20ac7a9 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,15 +118,14 @@ void BlockedTaskScheduler::_schedule() {
_make_task_run(local_blocked_tasks, iter, ready_tasks,
PipelineTaskState::PENDING_FINISH);
}
- } else if (task->fragment_context()->is_canceled()) {
+ } else if (task->query_context()->is_cancelled()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" <<
print_id(task->query_context()->query_id())
- << ", instance_id="
- <<
print_id(task->fragment_context()->get_fragment_instance_id())
+ << ", instance_id=" <<
print_id(task->instance_id())
<< ", task info: " << task->debug_string();
-
task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);
+ task->query_context()->cancel(true, "", Status::Cancelled(""));
_make_task_run(local_blocked_tasks, iter, ready_tasks);
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
@@ -321,8 +320,8 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state)
if (!status.ok() && state != PipelineTaskState::CANCELED) {
// Call `close` if `try_close` failed to make sure allocated resources
are released
task->close();
-
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
- status.to_string());
+ task->query_context()->cancel(true, status.to_string(),
+ Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
} else if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
@@ -331,8 +330,8 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state)
} else {
status = task->close();
if (!status.ok() && state != PipelineTaskState::CANCELED) {
-
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
- status.to_string());
+ task->query_context()->cancel(true, status.to_string(),
+
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
}
DCHECK(!task->is_pending_finish()) << task->debug_string();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dbb8c9b1c0..105ac80b81 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -133,11 +133,31 @@ public:
}
[[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
- void set_is_cancelled(bool v) {
+ bool cancel(bool v, std::string msg, Status new_status) {
+ if (_is_cancelled) {
+ return false;
+ }
_is_cancelled.store(v);
- // Create a error status, so that we could print error stack, and
- // we could know which path call cancel.
- LOG(INFO) << "task is cancelled, st = " <<
Status::Error<ErrorCode::CANCELLED>("");
+
+ set_ready_to_execute(true);
+ set_exec_status(new_status);
+ return true;
+ }
+
+ void set_exec_status(Status new_status) {
+ if (new_status.ok()) {
+ return;
+ }
+ std::lock_guard<std::mutex> l(_exec_status_lock);
+ if (!_exec_status.ok()) {
+ return;
+ }
+ _exec_status = new_status;
+ }
+
+ [[nodiscard]] Status exec_status() {
+ std::lock_guard<std::mutex> l(_exec_status_lock);
+ return _exec_status;
}
void set_ready_to_execute_only() {
@@ -254,6 +274,11 @@ private:
taskgroup::TaskGroupPtr _task_group;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;
+
+ std::mutex _exec_status_lock;
+ // All pipeline tasks use the same query context to report status. So we
need a `_exec_status`
+ // to report the real message if failed.
+ Status _exec_status = Status::OK();
};
} // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 3efce72ea1..944d1e42a3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -305,7 +305,7 @@ void
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
}
bool RuntimeState::is_cancelled() const {
- return _is_cancelled.load();
+ return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
}
Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 3c583b867c..b11180ab29 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -68,7 +68,7 @@ void transmit_block(PBackendService_Stub& stub, Closure*
closure,
}
template <typename Closure>
-Status transmit_block_http(RuntimeState* state, Closure* closure,
PTransmitDataParams& params,
+Status transmit_block_http(ExecEnv* exec_env, Closure* closure,
PTransmitDataParams& params,
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_block(¶ms, closure));
@@ -76,8 +76,7 @@ Status transmit_block_http(RuntimeState* state, Closure*
closure, PTransmitDataP
std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname,
brpc_dest_addr.port);
std::shared_ptr<PBackendService_Stub> brpc_http_stub =
-
state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
-
"http");
+
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
closure->cntl.http_request().uri() = brpc_url +
"/PInternalServiceImpl/transmit_block_by_http";
closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
closure->cntl.http_request().set_content_type("application/json");
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index fe86eefa4e..4c547c7840 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -192,7 +192,8 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos)
{
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (enable_http_send_block(_brpc_request,
_parent->transfer_large_data_by_brpc())) {
- RETURN_IF_ERROR(transmit_block_http(_state, _closure,
_brpc_request, _brpc_dest_addr));
+ RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure,
_brpc_request,
+ _brpc_dest_addr));
} else {
transmit_block(*_brpc_stub, _closure, _brpc_request);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]