This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 877935442f3 [feature](pipelineX)use markFragments instead of
markInstances in pipelineX (#27829)
877935442f3 is described below
commit 877935442f3a0592737b5e11dd1bc46eba5d5849
Author: Mryange <[email protected]>
AuthorDate: Mon Dec 11 17:59:53 2023 +0800
[feature](pipelineX)use markFragments instead of markInstances in pipelineX
(#27829)
---
be/src/exprs/runtime_filter.cpp | 38 ++--
be/src/exprs/runtime_filter.h | 34 ++--
be/src/exprs/runtime_filter_rpc.cpp | 16 +-
be/src/pipeline/pipeline_fragment_context.h | 7 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 208 ++++++++++++---------
.../pipeline_x/pipeline_x_fragment_context.h | 41 ++--
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 9 +-
be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +
be/src/runtime/fragment_mgr.cpp | 132 ++++++++-----
be/src/runtime/fragment_mgr.h | 8 +
be/src/runtime/query_context.h | 5 +
be/src/runtime/runtime_filter_mgr.cpp | 13 +-
be/src/runtime/runtime_filter_mgr.h | 13 +-
be/src/runtime/runtime_state.cpp | 63 ++++++-
be/src/runtime/runtime_state.h | 43 ++++-
be/src/service/internal_service.cpp | 17 +-
be/test/exprs/runtime_filter_test.cpp | 5 +-
.../doris/common/profile/ExecutionProfile.java | 135 ++++++++++++-
.../main/java/org/apache/doris/qe/Coordinator.java | 161 +++++++++++-----
.../org/apache/doris/rpc/BackendServiceProxy.java | 18 ++
gensrc/proto/internal_service.proto | 2 +
21 files changed, 710 insertions(+), 261 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index de3f2ad4fd5..efdb8af7029 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -279,20 +279,20 @@ Status create_vbin_predicate(const TypeDescriptor& type,
TExprOpcode::type opcod
// This class is a wrapper of runtime predicate function
class RuntimePredicateWrapper {
public:
- RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
+ RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool*
pool,
const RuntimeFilterParams* params)
: _state(state),
- _be_exec_version(_state->be_exec_version()),
+ _be_exec_version(_state->be_exec_version),
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
_filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
- RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
PrimitiveType column_type,
- RuntimeFilterType type, uint32_t filter_id)
+ RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool*
pool,
+ PrimitiveType column_type, RuntimeFilterType type,
uint32_t filter_id)
: _state(state),
- _be_exec_version(_state->be_exec_version()),
+ _be_exec_version(_state->be_exec_version),
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
@@ -945,7 +945,7 @@ public:
}
private:
- RuntimeState* _state;
+ RuntimeFilterParamsContext* _state;
QueryContext* _query_ctx;
int _be_exec_version;
ObjectPool* _pool;
@@ -962,9 +962,10 @@ private:
uint32_t _filter_id;
};
-Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const
TRuntimeFilterDesc* desc,
- const TQueryOptions* query_options, const
RuntimeFilterRole role,
- int node_id, IRuntimeFilter** res, bool
build_bf_exactly) {
+Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool*
pool,
+ const TRuntimeFilterDesc* desc, const
TQueryOptions* query_options,
+ const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
+ bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(state, pool, desc));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id,
build_bf_exactly);
@@ -1003,7 +1004,7 @@ Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
std::vector<IRuntimeFilter*> filters;
-
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id,
filters));
+
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
// push down
for (auto filter : filters) {
filter->_wrapper = _wrapper;
@@ -1014,7 +1015,7 @@ Status IRuntimeFilter::publish() {
} else {
TNetworkAddress addr;
DCHECK(_state != nullptr);
- RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
+ RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return push_to_remote(_state, &addr, _opt_remote_rf);
}
}
@@ -1036,9 +1037,9 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
bool IRuntimeFilter::await() {
DCHECK(is_consumer());
auto execution_timeout = _state == nullptr ?
_query_ctx->execution_timeout() * 1000
- : _state->execution_timeout() *
1000;
+ : _state->execution_timeout *
1000;
auto runtime_filter_wait_time_ms = _state == nullptr ?
_query_ctx->runtime_filter_wait_time_ms()
- :
_state->runtime_filter_wait_time_ms();
+ :
_state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be
applied.
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
? execution_timeout
@@ -1234,14 +1235,14 @@ Status
IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** data,
return serialize_impl(request, data, len);
}
-Status IRuntimeFilter::create_wrapper(RuntimeState* state, const
MergeRuntimeFilterParams* param,
- ObjectPool* pool,
+Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
+ const MergeRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
return _create_wrapper(state, param, pool, wrapper);
}
-Status IRuntimeFilter::create_wrapper(RuntimeState* state, const
UpdateRuntimeFilterParams* param,
- ObjectPool* pool,
+Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
+ const UpdateRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
return _create_wrapper(state, param, pool, wrapper);
}
@@ -1290,7 +1291,8 @@ Status IRuntimeFilter::init_bloom_filter(const size_t
build_bf_cardinality) {
}
template <class T>
-Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param,
ObjectPool* pool,
+Status IRuntimeFilter::_create_wrapper(RuntimeFilterParamsContext* state,
const T* param,
+ ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 97078c11757..cc47d590e6b 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -185,7 +185,8 @@ enum RuntimeFilterState {
/// that can be pushed down to node based on the results of the right table.
class IRuntimeFilter {
public:
- IRuntimeFilter(RuntimeState* state, ObjectPool* pool, const
TRuntimeFilterDesc* desc)
+ IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
+ const TRuntimeFilterDesc* desc)
: _state(state),
_pool(pool),
_filter_id(desc->filter_id),
@@ -199,9 +200,9 @@ public:
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
- _wait_infinitely(_state->runtime_filter_wait_infinitely()),
- _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
- _enable_pipeline_exec(_state->enable_pipeline_exec()),
+ _wait_infinitely(_state->runtime_filter_wait_infinitely),
+ _rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
+ _enable_pipeline_exec(_state->enable_pipeline_exec),
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
to_string(_runtime_filter_type))),
@@ -231,9 +232,10 @@ public:
~IRuntimeFilter() = default;
- static Status create(RuntimeState* state, ObjectPool* pool, const
TRuntimeFilterDesc* desc,
- const TQueryOptions* query_options, const
RuntimeFilterRole role,
- int node_id, IRuntimeFilter** res, bool
build_bf_exactly = false);
+ static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
+ const TRuntimeFilterDesc* desc, const TQueryOptions*
query_options,
+ const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
+ bool build_bf_exactly = false);
static Status create(QueryContext* query_ctx, ObjectPool* pool, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
@@ -299,11 +301,11 @@ public:
Status merge_from(const RuntimePredicateWrapper* wrapper);
// for ut
- static Status create_wrapper(RuntimeState* state, const
MergeRuntimeFilterParams* param,
- ObjectPool* pool,
+ static Status create_wrapper(RuntimeFilterParamsContext* state,
+ const MergeRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
- static Status create_wrapper(RuntimeState* state, const
UpdateRuntimeFilterParams* param,
- ObjectPool* pool,
+ static Status create_wrapper(RuntimeFilterParamsContext* state,
+ const UpdateRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
static Status create_wrapper(QueryContext* query_ctx, const
UpdateRuntimeFilterParamsV2* param,
ObjectPool* pool,
@@ -325,7 +327,8 @@ public:
Status join_rpc();
// async push runtimefilter to remote node
- Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
bool opt_remote_rf);
+ Status push_to_remote(RuntimeFilterParamsContext* state, const
TNetworkAddress* addr,
+ bool opt_remote_rf);
void init_profile(RuntimeProfile* parent_profile);
@@ -367,7 +370,7 @@ public:
int32_t wait_time_ms() const {
int32_t res = 0;
if (wait_infinitely()) {
- res = _state == nullptr ? _query_ctx->execution_timeout() :
_state->execution_timeout();
+ res = _state == nullptr ? _query_ctx->execution_timeout() :
_state->execution_timeout;
// Convert to ms
res *= 1000;
} else {
@@ -391,7 +394,8 @@ protected:
Status serialize_impl(T* request, void** data, int* len);
template <class T>
- static Status _create_wrapper(RuntimeState* state, const T* param,
ObjectPool* pool,
+ static Status _create_wrapper(RuntimeFilterParamsContext* state, const T*
param,
+ ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
void _set_push_down() { _is_push_down = true; }
@@ -419,7 +423,7 @@ protected:
}
}
- RuntimeState* _state = nullptr;
+ RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
ObjectPool* _pool = nullptr;
// _wrapper is a runtime filter function wrapper
diff --git a/be/src/exprs/runtime_filter_rpc.cpp
b/be/src/exprs/runtime_filter_rpc.cpp
index 00540b8382c..a9aa7944625 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -48,12 +48,12 @@ struct IRuntimeFilter::RPCContext {
static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished =
true; }
};
-Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr,
- bool opt_remote_rf) {
+Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
+ const TNetworkAddress* addr, bool
opt_remote_rf) {
DCHECK(is_producer());
DCHECK(_rpc_context == nullptr);
std::shared_ptr<PBackendService_Stub> stub(
-
state->exec_env()->brpc_internal_client_cache()->get_client(*addr));
+ state->exec_env->brpc_internal_client_cache()->get_client(*addr));
if (!stub) {
std::string msg =
fmt::format("Get rpc stub failed, host={}, port=",
addr->hostname, addr->port);
@@ -64,16 +64,16 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state,
const TNetworkAddress
int len = 0;
auto pquery_id = _rpc_context->request.mutable_query_id();
- pquery_id->set_hi(_state->query_id().hi);
- pquery_id->set_lo(_state->query_id().lo);
+ pquery_id->set_hi(_state->query_id.hi());
+ pquery_id->set_lo(_state->query_id.lo());
auto pfragment_instance_id =
_rpc_context->request.mutable_fragment_instance_id();
- pfragment_instance_id->set_hi(state->fragment_instance_id().hi);
- pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
+ pfragment_instance_id->set_hi(state->fragment_instance_id.hi());
+ pfragment_instance_id->set_lo(state->fragment_instance_id.lo());
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
- _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
+ _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec);
_rpc_context->cntl.set_timeout_ms(wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index e95bef870a3..0800da22ad4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -74,10 +74,15 @@ public:
TUniqueId get_fragment_instance_id() const { return _fragment_instance_id;
}
- virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/)
{
+ RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
return _runtime_state.get();
}
+ virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId
/*fragment_instance_id*/) {
+ return _runtime_state->runtime_filter_mgr();
+ }
+
+ QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); }
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return
_runtime_state->is_cancelled(); }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ac19c92ff55..724d50f55e4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -100,11 +100,6 @@
namespace doris::pipeline {
-#define FOR_EACH_RUNTIME_STATE(stmt) \
- for (auto& runtime_state : _runtime_states) { \
- stmt \
- }
-
PipelineXFragmentContext::PipelineXFragmentContext(
const TUniqueId& query_id, const int fragment_id,
std::shared_ptr<QueryContext> query_ctx,
ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>&
call_back,
@@ -114,10 +109,13 @@ PipelineXFragmentContext::PipelineXFragmentContext(
PipelineXFragmentContext::~PipelineXFragmentContext() {
auto st = _query_ctx->exec_status();
- if (!_runtime_states.empty()) {
+ if (!_task_runtime_states.empty()) {
// The memory released by the query end is recorded in the query mem
tracker, main memory in _runtime_state.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
- FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st);
runtime_state.reset();)
+ for (auto& runtime_state : _task_runtime_states) {
+ _call_back(runtime_state.get(), &st);
+ runtime_state.reset();
+ }
} else {
_call_back(nullptr, &st);
}
@@ -136,8 +134,9 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
}
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
- FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext
cancel instance: "
- <<
print_id(runtime_state->fragment_instance_id());)
+ for (auto& id : _fragment_instance_ids) {
+ LOG(WARNING) << "PipelineXFragmentContext cancel instance: "
<< print_id(id);
+ }
} else {
_set_is_report_on_cancel(false); // TODO bug llj
}
@@ -229,7 +228,10 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
- RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx));
+ if (_enable_local_shuffle()) {
+ RETURN_IF_ERROR(
+ _plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx));
+ }
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
@@ -451,43 +453,84 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
_total_tasks = 0;
int target_size = request.local_params.size();
- _runtime_states.resize(target_size);
_tasks.resize(target_size);
+ auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
+ DCHECK(pipeline_id_to_profile.empty());
+ pipeline_id_to_profile.resize(_pipelines.size());
+ {
+ size_t pip_idx = 0;
+ for (auto& pipeline_profile : pipeline_id_to_profile) {
+ pipeline_profile =
+ std::make_unique<RuntimeProfile>("Pipeline : " +
std::to_string(pip_idx));
+ pip_idx++;
+ }
+ }
+
for (size_t i = 0; i < target_size; i++) {
const auto& local_params = request.local_params[i];
+ auto fragment_instance_id = local_params.fragment_instance_id;
+ _fragment_instance_ids.push_back(fragment_instance_id);
+ std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
+ auto set_runtime_state = [&](std::unique_ptr<RuntimeState>&
runtime_state) {
+
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
- _runtime_states[i] = RuntimeState::create_unique(
- local_params.fragment_instance_id, request.query_id,
request.fragment_id,
- request.query_options, _query_ctx->query_globals, _exec_env);
- if (local_params.__isset.runtime_filter_params) {
-
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
- }
- _runtime_states[i]->set_query_ctx(_query_ctx.get());
-
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+ runtime_state->set_query_ctx(_query_ctx.get());
+ runtime_state->set_be_number(local_params.backend_num);
- static_cast<void>(_runtime_states[i]->runtime_filter_mgr()->init());
- _runtime_states[i]->set_be_number(local_params.backend_num);
+ if (request.__isset.backend_id) {
+ runtime_state->set_backend_id(request.backend_id);
+ }
+ if (request.__isset.import_label) {
+ runtime_state->set_import_label(request.import_label);
+ }
+ if (request.__isset.db_name) {
+ runtime_state->set_db_name(request.db_name);
+ }
+ if (request.__isset.load_job_id) {
+ runtime_state->set_load_job_id(request.load_job_id);
+ }
- if (request.__isset.backend_id) {
- _runtime_states[i]->set_backend_id(request.backend_id);
- }
- if (request.__isset.import_label) {
- _runtime_states[i]->set_import_label(request.import_label);
- }
- if (request.__isset.db_name) {
- _runtime_states[i]->set_db_name(request.db_name);
+ runtime_state->set_desc_tbl(_desc_tbl);
+
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+ runtime_state->set_num_per_fragment_instances(request.num_senders);
+ runtime_state->resize_op_id_to_local_state(max_operator_id(),
max_sink_operator_id());
+
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ runtime_state->set_total_load_streams(request.total_load_streams);
+ runtime_state->set_num_local_sink(request.num_local_sink);
+ DCHECK(runtime_filter_mgr);
+
runtime_state->set_pipeline_x_runtime_filter_mgr(runtime_filter_mgr.get());
+ };
+
+ auto filterparams = std::make_unique<RuntimeFilterParamsContext>();
+
+ {
+ filterparams->runtime_filter_wait_infinitely =
+ _runtime_state->runtime_filter_wait_infinitely();
+ filterparams->runtime_filter_wait_time_ms =
+ _runtime_state->runtime_filter_wait_time_ms();
+ filterparams->enable_pipeline_exec =
_runtime_state->enable_pipeline_exec();
+ filterparams->execution_timeout =
_runtime_state->execution_timeout();
+
+ filterparams->exec_env = ExecEnv::GetInstance();
+ filterparams->query_id.set_hi(_runtime_state->query_id().hi);
+ filterparams->query_id.set_lo(_runtime_state->query_id().lo);
+
+ filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi);
+ filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo);
+ filterparams->be_exec_version = _runtime_state->be_exec_version();
+ filterparams->query_ctx = _query_ctx.get();
}
- if (request.__isset.load_job_id) {
- _runtime_states[i]->set_load_job_id(request.load_job_id);
+
+ // build runtime_filter_mgr for each instance
+ runtime_filter_mgr =
+ std::make_unique<RuntimeFilterMgr>(request.query_id,
filterparams.get());
+ if (local_params.__isset.runtime_filter_params) {
+
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);
}
+ RETURN_IF_ERROR(runtime_filter_mgr->init());
+ filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
- _runtime_states[i]->set_desc_tbl(_desc_tbl);
-
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
-
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
- _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(),
max_sink_operator_id());
-
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
- _runtime_states[i]->set_total_load_streams(request.total_load_streams);
- _runtime_states[i]->set_num_local_sink(request.num_local_sink);
+ _runtime_filter_states.push_back(std::move(filterparams));
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::shared_ptr<LocalExchangeSharedState>> {
@@ -504,12 +547,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
return le_state_map;
};
- for (auto& pipeline : _pipelines) {
+ auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
+ DCHECK(_task_runtime_states[task_id]);
+ return _task_runtime_states[task_id].get();
+ };
+ for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+ auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
- auto task = std::make_unique<PipelineXTask>(pipeline,
_total_tasks++,
-
_runtime_states[i].get(), this,
-
_runtime_states[i]->runtime_profile(),
-
get_local_exchange_state(pipeline), i);
+ // build task runtime state
+ _task_runtime_states.push_back(RuntimeState::create_unique(
+ this, local_params.fragment_instance_id,
request.query_id,
+ request.fragment_id, request.query_options,
_query_ctx->query_globals,
+ _exec_env));
+ auto& task_runtime_state = _task_runtime_states.back();
+ set_runtime_state(task_runtime_state);
+ auto cur_task_id = _total_tasks++;
+ auto task = std::make_unique<PipelineXTask>(
+ pipeline, cur_task_id,
get_task_runtime_state(cur_task_id), this,
+ pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline),
+ i);
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
@@ -533,32 +589,18 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
* and JoinProbeOperator2.
*/
- // First, set up the parent profile,
- // then prepare the task profile and add it to
operator_id_to_task_profile.
- std::vector<RuntimeProfile*> operator_id_to_task_profile(
- max_operator_id(), _runtime_states[i]->runtime_profile());
- auto prepare_and_set_parent_profile = [&](PipelineXTask* task) {
- auto sink = task->sink();
- const auto& dests_id = sink->dests_id();
- int dest_id = dests_id.front();
- DCHECK(dest_id < operator_id_to_task_profile.size());
- task->set_parent_profile(operator_id_to_task_profile[dest_id]);
-
- RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(),
local_params,
- request.fragment.output_sink));
+ // First, set up the parent profile,task runtime state
- for (auto o : task->operatorXs()) {
- int id = o->operator_id();
- DCHECK(id < operator_id_to_task_profile.size());
- auto* op_local_state =
_runtime_states[i].get()->get_local_state(o->operator_id());
- operator_id_to_task_profile[id] = op_local_state->profile();
- }
+ auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t
pip_idx) {
+ DCHECK(pipeline_id_to_profile[pip_idx]);
+
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()),
local_params,
+ request.fragment.output_sink));
return Status::OK();
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
- auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+ auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
DCHECK(task != nullptr);
// if this task has upstream dependency, then record them.
@@ -571,15 +613,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}
}
- RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
+ RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
}
}
-
{
std::lock_guard<std::mutex> l(_state_map_lock);
- _instance_id_to_runtime_state.insert(
- {UniqueId(_runtime_states[i]->fragment_instance_id()),
- _runtime_states[i].get()});
+ _runtime_filter_mgr_map[fragment_instance_id] =
std::move(runtime_filter_mgr);
}
}
_pipeline_parent_map.clear();
@@ -692,7 +731,8 @@ Status PipelineXFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr
cur_pipe,
const std::vector<TExpr>& texprs, ExchangeType exchange_type, bool*
do_local_exchange,
int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx)
{
- if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
+ DCHECK(_enable_local_shuffle());
+ if (_num_instances <= 1) {
return Status::OK();
}
@@ -1144,17 +1184,17 @@ Status PipelineXFragmentContext::submit() {
}
void PipelineXFragmentContext::close_sink() {
- FOR_EACH_RUNTIME_STATE(static_cast<void>(_sink->close(
- runtime_state.get(),
- _prepared ? Status::RuntimeError("prepare failed") :
Status::OK())););
+ for (auto& tasks : _tasks) {
+ auto& root_task = *tasks.begin();
+ auto st = root_task->close_sink(_prepared ?
Status::RuntimeError("prepare failed")
+ : Status::OK());
+ if (!st.ok()) {
+ LOG_WARNING("PipelineXFragmentContext::close_sink()
error").tag("msg", st.msg());
+ }
+ }
}
void PipelineXFragmentContext::close_if_prepare_failed() {
- if (_tasks.empty()) {
- FOR_EACH_RUNTIME_STATE(
- static_cast<void>(_root_op->close(runtime_state.get()));
static_cast<void>(
- _sink->close(runtime_state.get(),
Status::RuntimeError("prepare failed")));)
- }
for (auto& task : _tasks) {
for (auto& t : task) {
DCHECK(!t->is_pending_finish());
@@ -1196,15 +1236,15 @@ Status PipelineXFragmentContext::send_report(bool done)
{
return Status::NeedSendAgain("");
}
- std::vector<RuntimeState*> runtime_states(_runtime_states.size());
- for (size_t i = 0; i < _runtime_states.size(); i++) {
- runtime_states[i] = _runtime_states[i].get();
- }
+ std::vector<RuntimeState*> runtime_states;
+ for (auto& task_state : _task_runtime_states) {
+ runtime_states.push_back(task_state.get());
+ }
return _report_status_cb(
- {true, exec_status, runtime_states, nullptr, nullptr, done ||
!exec_status.ok(),
- _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(),
_backend_num,
- _runtime_state.get(),
+ {true, exec_status, runtime_states, nullptr,
_runtime_state->load_channel_profile(),
+ done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
_fragment_id,
+ TUniqueId(), _backend_num, _runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this,
std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this,
std::placeholders::_1,
std::placeholders::_2),
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index a95a90e356d..326f1f84254 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -19,6 +19,7 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
@@ -69,16 +70,16 @@ public:
~PipelineXFragmentContext() override;
void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
- ins_ids.resize(_runtime_states.size());
- for (size_t i = 0; i < _runtime_states.size(); i++) {
- ins_ids[i] = _runtime_states[i]->fragment_instance_id();
+ ins_ids.resize(_fragment_instance_ids.size());
+ for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
+ ins_ids[i] = _fragment_instance_ids[i];
}
}
void instance_ids(std::vector<string>& ins_ids) const override {
- ins_ids.resize(_runtime_states.size());
- for (size_t i = 0; i < _runtime_states.size(); i++) {
- ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id());
+ ins_ids.resize(_fragment_instance_ids.size());
+ for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
+ ins_ids[i] = print_id(_fragment_instance_ids[i]);
}
}
@@ -102,13 +103,9 @@ public:
Status send_report(bool) override;
- RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
- std::lock_guard<std::mutex> l(_state_map_lock);
- if (_instance_id_to_runtime_state.contains(fragment_instance_id)) {
- return _instance_id_to_runtime_state[fragment_instance_id];
- } else {
- return _runtime_state.get();
- }
+ RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId fragment_instance_id)
override {
+ DCHECK(_runtime_filter_mgr_map.contains(fragment_instance_id));
+ return _runtime_filter_mgr_map[fragment_instance_id].get();
}
[[nodiscard]] int next_operator_id() { return _operator_id++; }
@@ -162,13 +159,12 @@ private:
bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+ bool _enable_local_shuffle() const { return
_runtime_state->enable_local_shuffle(); }
+
OperatorXPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
- // Local runtime states for each pipeline task.
- std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
-
// It is used to manage the lifecycle of RuntimeFilterMergeController
std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>>
_merge_controller_handlers;
@@ -219,6 +215,19 @@ private:
int _operator_id = 0;
int _sink_operator_id = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>>
_op_id_to_le_state;
+
+ // UniqueId -> runtime mgr
+ std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>>
_runtime_filter_mgr_map;
+
+ //Here are two types of runtime states:
+ // - _runtime state is at the Fragment level.
+ // - _task_runtime_states is at the task level, unique to each task.
+
+ std::vector<TUniqueId> _fragment_instance_ids;
+ // Local runtime states for each task
+ std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
+
+ std::vector<std::unique_ptr<RuntimeFilterParamsContext>>
_runtime_filter_states;
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index cfe859b2e7c..5f07b79bebf 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -73,10 +73,11 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
+ DCHECK_EQ(state, _state);
{
// set sink local state
- LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
+ LocalSinkStateInfo info {_task_profile.get(), local_params.sender_id,
get_downstream_dependency(), _le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
}
@@ -84,7 +85,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(),
no_scan_ranges);
- auto* parent_profile = _parent_profile;
+ auto* parent_profile = _task_profile.get();
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
@@ -330,6 +331,10 @@ Status PipelineXTask::close(Status exec_status) {
return s;
}
+Status PipelineXTask::close_sink(Status exec_status) {
+ return _sink->close(_state, exec_status);
+}
+
std::string PipelineXTask::debug_string() {
std::unique_lock<std::mutex> lc(_release_lock);
fmt::memory_buffer debug_string_buffer;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 261ecb54f24..0bb3e16fc98 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -71,6 +71,7 @@ public:
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status) override;
+ Status close_sink(Status exec_status);
bool source_can_read() override {
if (_dry_run) {
return true;
@@ -123,6 +124,8 @@ public:
OperatorXs operatorXs() { return _operators; }
+ int task_id() const { return _index; };
+
void clear_blocking_state() {
if (!is_final_state(get_state()) && get_state() !=
PipelineTaskState::PENDING_FINISH &&
_blocked_dep) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 57074bc629c..3c795273a77 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -236,32 +236,44 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
if (req.is_pipeline_x) {
params.__isset.detailed_report = true;
- for (auto* rs : req.runtime_states) {
- TDetailedReportParams detailed_param;
-
detailed_param.__set_fragment_instance_id(rs->fragment_instance_id());
- detailed_param.__isset.fragment_instance_id = true;
-
- if (rs->enable_profile()) {
- detailed_param.__isset.profile = true;
- detailed_param.__isset.loadChannelProfile = true;
-
- rs->runtime_profile()->to_thrift(&detailed_param.profile);
+ DCHECK(!req.runtime_states.empty());
+ const bool enable_profile =
(*req.runtime_states.begin())->enable_profile();
+ if (enable_profile) {
+ params.__isset.profile = true;
+ params.__isset.loadChannelProfile = false;
+ for (auto* rs : req.runtime_states) {
+ DCHECK(req.load_channel_profile);
+ TDetailedReportParams detailed_param;
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
+ // merge all runtime_states.loadChannelProfile to
req.load_channel_profile
+
req.load_channel_profile->update(detailed_param.loadChannelProfile);
}
-
- params.detailed_report.push_back(detailed_param);
+
req.load_channel_profile->to_thrift(¶ms.loadChannelProfile);
+ } else {
+ params.__isset.profile = false;
}
- }
- if (req.profile != nullptr) {
- req.profile->to_thrift(¶ms.profile);
- if (req.load_channel_profile) {
-
req.load_channel_profile->to_thrift(¶ms.loadChannelProfile);
+ if (enable_profile) {
+ for (auto& pipeline_profile :
req.runtime_state->pipeline_id_to_profile()) {
+ TDetailedReportParams detailed_param;
+ detailed_param.__isset.fragment_instance_id = false;
+ detailed_param.__isset.profile = true;
+ detailed_param.__isset.loadChannelProfile = false;
+ pipeline_profile->to_thrift(&detailed_param.profile);
+ params.detailed_report.push_back(detailed_param);
+ }
}
- params.__isset.profile = true;
- params.__isset.loadChannelProfile = true;
} else {
- params.__isset.profile = false;
+ if (req.profile != nullptr) {
+ req.profile->to_thrift(¶ms.profile);
+ if (req.load_channel_profile) {
+
req.load_channel_profile->to_thrift(¶ms.loadChannelProfile);
+ }
+ params.__isset.profile = true;
+ params.__isset.loadChannelProfile = true;
+ } else {
+ params.__isset.profile = false;
+ }
}
if (!req.runtime_state->output_files().empty()) {
@@ -770,8 +782,9 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
// TODO need check the status, but when I add return_if_error the P0 will
not pass
- static_cast<void>(_runtimefilter_controller.add_entity(params, &handler,
-
fragment_executor->runtime_state()));
+ static_cast<void>(_runtimefilter_controller.add_entity(
+ params, &handler,
+
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
fragment_executor->set_merge_controller_handler(handler);
{
std::lock_guard<std::mutex> lock(_lock);
@@ -852,9 +865,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
for (size_t i = 0; i < params.local_params.size(); i++) {
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
- static_cast<void>(
- _runtimefilter_controller.add_entity(params,
params.local_params[i], &handler,
-
context->get_runtime_state(UniqueId())));
+ static_cast<void>(_runtimefilter_controller.add_entity(
+ params, params.local_params[i], &handler,
+
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
context->set_merge_controller_handler(handler);
const TUniqueId& fragment_instance_id =
params.local_params[i].fragment_instance_id;
{
@@ -933,7 +946,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
static_cast<void>(_runtimefilter_controller.add_entity(
- params, local_params, &handler,
context->get_runtime_state(UniqueId())));
+ params, local_params, &handler,
+
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
context->set_merge_controller_handler(handler);
{
@@ -1010,9 +1024,15 @@ void FragmentMgr::cancel_query_unlocked(const TUniqueId&
query_id,
LOG(WARNING) << "Query " << print_id(query_id) << " does not exists,
failed to cancel it";
return;
}
+ if (ctx->second->enable_pipeline_x_exec()) {
+ for (auto& [f_id, f_context] :
ctx->second->fragment_id_to_pipeline_ctx) {
+ cancel_fragment_unlocked(query_id, f_id, reason, state_lock, msg);
+ }
- for (auto it : ctx->second->fragment_instance_ids) {
- cancel_instance_unlocked(it, reason, state_lock, msg);
+ } else {
+ for (auto it : ctx->second->fragment_instance_ids) {
+ cancel_instance_unlocked(it, reason, state_lock, msg);
+ }
}
ctx->second->cancel(true, msg, Status::Cancelled(msg));
@@ -1054,23 +1074,50 @@ void FragmentMgr::cancel_instance_unlocked(const
TUniqueId& instance_id,
}
}
+void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t
fragment_id,
+ const PPlanFragmentCancelReason& reason,
const std::string& msg) {
+ std::unique_lock<std::mutex> state_lock(_lock);
+ return cancel_fragment_unlocked(query_id, fragment_id, reason, state_lock,
msg);
+}
+
+void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& query_id, int32_t
fragment_id,
+ const PPlanFragmentCancelReason&
reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg) {
+ auto q_ctx = _query_ctx_map.find(query_id)->second;
+ auto f_context = q_ctx->fragment_id_to_pipeline_ctx.find(fragment_id);
+ if (f_context != q_ctx->fragment_id_to_pipeline_ctx.end()) {
+ f_context->second->cancel(reason, msg);
+ } else {
+ LOG(WARNING) << "Could not find the pipeline query id:" <<
print_id(query_id)
+ << " fragment id:" << fragment_id << " to cancel";
+ }
+}
+
bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
std::lock_guard<std::mutex> lock(_lock);
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
- for (auto itr : ctx->second->fragment_instance_ids) {
- if (is_pipeline_version) {
- auto pipeline_ctx_iter = _pipeline_map.find(itr);
- if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
- return pipeline_ctx_iter->second->is_canceled();
- }
- } else {
- auto fragment_instance_itr = _fragment_instance_map.find(itr);
- if (fragment_instance_itr != _fragment_instance_map.end() &&
- fragment_instance_itr->second) {
- return fragment_instance_itr->second->is_canceled();
+ const bool is_pipeline_x = ctx->second->enable_pipeline_x_exec();
+ if (is_pipeline_x) {
+ for (auto& [id, f_context] :
ctx->second->fragment_id_to_pipeline_ctx) {
+ return f_context->is_canceled();
+ }
+ } else {
+ for (auto itr : ctx->second->fragment_instance_ids) {
+ if (is_pipeline_version) {
+ auto pipeline_ctx_iter = _pipeline_map.find(itr);
+ if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
+ return pipeline_ctx_iter->second->is_canceled();
+ }
+ } else {
+ auto fragment_instance_itr =
_fragment_instance_map.find(itr);
+ if (fragment_instance_itr != _fragment_instance_map.end()
&&
+ fragment_instance_itr->second) {
+ return fragment_instance_itr->second->is_canceled();
+ }
}
}
}
@@ -1306,8 +1353,7 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
-
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
+ runtime_filter_mgr =
pip_context->get_runtime_filter_mgr(fragment_instance_id);
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
@@ -1349,9 +1395,7 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_runtime_state(fragment_instance_id)
- ->get_query_ctx()
- ->runtime_filter_mgr();
+ runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
pool = &pip_context->get_query_context()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index a20da9387af..21d85503803 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -102,6 +102,14 @@ public:
const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>&
state_lock,
const std::string& msg = "");
+ // Cancel fragment (only pipelineX).
+ // {query id fragment} -> PipelineXFragmentContext
+ void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
+ const PPlanFragmentCancelReason& reason, const
std::string& msg = "");
+ void cancel_fragment_unlocked(const TUniqueId& query_id, int32_t
fragment_id,
+ const PPlanFragmentCancelReason& reason,
+ const std::unique_lock<std::mutex>&
state_lock,
+ const std::string& msg = "");
// Can be used in both version.
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 0e3a04f8998..6d392c56175 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -172,6 +172,11 @@ public:
_query_options.enable_pipeline_engine;
}
+ bool enable_pipeline_x_exec() const {
+ return _query_options.__isset.enable_pipeline_x_engine &&
+ _query_options.enable_pipeline_x_engine;
+ }
+
int be_exec_version() const {
if (!_query_options.__isset.be_exec_version) {
return 0;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index bca128c652a..a2120c92389 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -51,7 +51,10 @@ struct AsyncRPCContext {
brpc::CallId cid;
};
-RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState*
state) : _state(state) {}
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id,
RuntimeFilterParamsContext* state) {
+ _state = state;
+ _state->runtime_filter_mgr = this;
+}
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext*
query_ctx)
: _query_ctx(query_ctx) {}
@@ -133,8 +136,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
} else {
- DCHECK(_state != nullptr);
-
if (iter != _consumer_map.end()) {
for (auto holder : iter->second) {
if (holder.node_id == node_id) {
@@ -475,7 +476,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
Status RuntimeFilterMergeController::add_entity(
const TExecPlanFragmentParams& params,
- std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
RuntimeState* state) {
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
+ RuntimeFilterParamsContext* state) {
if (!params.params.__isset.runtime_filter_params ||
params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0)
{
return Status::OK();
@@ -506,7 +508,8 @@ Status RuntimeFilterMergeController::add_entity(
Status RuntimeFilterMergeController::add_entity(
const TPipelineFragmentParams& params, const TPipelineInstanceParams&
local_params,
- std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
RuntimeState* state) {
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
+ RuntimeFilterParamsContext* state) {
if (!local_params.__isset.runtime_filter_params ||
local_params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 939ee2c8139..5f9ee46d656 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -50,6 +50,7 @@ class RuntimeState;
enum class RuntimeFilterRole;
class RuntimePredicateWrapper;
class QueryContext;
+struct RuntimeFilterParamsContext;
/// producer:
/// Filter filter;
@@ -65,7 +66,7 @@ class QueryContext;
// RuntimeFilterMgr will be destroyed when RuntimeState is destroyed
class RuntimeFilterMgr {
public:
- RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
+ RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext*
state);
RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
@@ -106,7 +107,7 @@ private:
std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
std::map<int32_t, IRuntimeFilter*> _producer_map;
- RuntimeState* _state = nullptr;
+ RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
std::unique_ptr<MemTracker> _tracker;
ObjectPool _pool;
@@ -123,7 +124,7 @@ private:
// the class is destroyed with the last fragment_exec.
class RuntimeFilterMergeControllerEntity {
public:
- RuntimeFilterMergeControllerEntity(RuntimeState* state)
+ RuntimeFilterMergeControllerEntity(RuntimeFilterParamsContext* state)
: _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {}
~RuntimeFilterMergeControllerEntity() = default;
@@ -172,7 +173,7 @@ private:
using CntlValwithLock =
std::pair<std::shared_ptr<RuntimeFilterCntlVal>,
std::unique_ptr<std::mutex>>;
std::map<int, CntlValwithLock> _filter_map;
- RuntimeState* _state = nullptr;
+ RuntimeFilterParamsContext* _state = nullptr;
bool _opt_remote_rf = true;
};
@@ -188,11 +189,11 @@ public:
// add_entity will return a exists entity
Status add_entity(const TExecPlanFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity>*
handle,
- RuntimeState* state);
+ RuntimeFilterParamsContext* state);
Status add_entity(const TPipelineFragmentParams& params,
const TPipelineInstanceParams& local_params,
std::shared_ptr<RuntimeFilterMergeControllerEntity>*
handle,
- RuntimeState* state);
+ RuntimeFilterParamsContext* state);
// thread safe
// increase a reference count
// if a query-id is not exist
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 43eff466019..9082a5a322d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -54,7 +54,6 @@ RuntimeState::RuntimeState(const TUniqueId&
fragment_instance_id,
: _profile("Fragment " + print_id(fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
- _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_is_cancelled(false),
@@ -71,6 +70,8 @@ RuntimeState::RuntimeState(const TUniqueId&
fragment_instance_id,
_error_log_file(nullptr) {
Status status = init(fragment_instance_id, query_options, query_globals,
exec_env);
DCHECK(status.ok());
+ _runtime_filter_mgr.reset(
+ new RuntimeFilterMgr(TUniqueId(),
RuntimeFilterParamsContext::create(this)));
}
RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
@@ -79,7 +80,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
: _profile("Fragment " +
print_id(fragment_exec_params.fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
- _runtime_filter_mgr(new
RuntimeFilterMgr(fragment_exec_params.query_id, this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(fragment_exec_params.query_id),
@@ -94,12 +94,14 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr) {
- if (fragment_exec_params.__isset.runtime_filter_params) {
-
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
- }
Status status =
init(fragment_exec_params.fragment_instance_id, query_options,
query_globals, exec_env);
DCHECK(status.ok());
+ _runtime_filter_mgr.reset(new
RuntimeFilterMgr(fragment_exec_params.query_id,
+
RuntimeFilterParamsContext::create(this)));
+ if (fragment_exec_params.__isset.runtime_filter_params) {
+
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+ }
}
RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId&
query_id,
@@ -108,7 +110,36 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id,
const TUniqueId& query_
: _profile("Fragment " + print_id(instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
- _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
+ _data_stream_recvrs_pool(new ObjectPool()),
+ _unreported_error_idx(0),
+ _query_id(query_id),
+ _fragment_id(fragment_id),
+ _is_cancelled(false),
+ _per_fragment_instance_idx(0),
+ _num_rows_load_total(0),
+ _num_rows_load_filtered(0),
+ _num_rows_load_unselected(0),
+ _num_rows_filtered_in_strict_mode_partial_update(0),
+ _num_print_error_rows(0),
+ _num_bytes_load_total(0),
+ _num_finished_scan_range(0),
+ _normal_row_number(0),
+ _error_row_number(0),
+ _error_log_file(nullptr) {
+ [[maybe_unused]] auto status = init(instance_id, query_options,
query_globals, exec_env);
+ DCHECK(status.ok());
+ _runtime_filter_mgr.reset(
+ new RuntimeFilterMgr(query_id,
RuntimeFilterParamsContext::create(this)));
+}
+
+RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const
TUniqueId& instance_id,
+ const TUniqueId& query_id, int32_t fragment_id,
+ const TQueryOptions& query_options, const
TQueryGlobals& query_globals,
+ ExecEnv* exec_env)
+ : _profile("Fragment " + print_id(instance_id)),
+ _load_channel_profile("<unnamed>"),
+ _obj_pool(new ObjectPool()),
+ _runtime_filter_mgr(nullptr),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(query_id),
@@ -135,7 +166,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id,
int32_t fragment_id,
: _profile("PipelineX " + std::to_string(fragment_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
- _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(query_id),
@@ -155,6 +185,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id,
int32_t fragment_id,
// TODO: do we really need instance id?
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
DCHECK(status.ok());
+ _runtime_filter_mgr.reset(
+ new RuntimeFilterMgr(query_id,
RuntimeFilterParamsContext::create(this)));
}
RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
@@ -485,4 +517,21 @@ bool RuntimeState::enable_page_cache() const {
(_query_options.__isset.enable_page_cache &&
_query_options.enable_page_cache);
}
+RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState*
state) {
+ RuntimeFilterParamsContext* params = state->obj_pool()->add(new
RuntimeFilterParamsContext());
+ params->runtime_filter_wait_infinitely =
state->runtime_filter_wait_infinitely();
+ params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
+ params->enable_pipeline_exec = state->enable_pipeline_exec();
+ params->execution_timeout = state->execution_timeout();
+ params->runtime_filter_mgr = state->runtime_filter_mgr();
+ params->exec_env = state->exec_env();
+ params->query_id.set_hi(state->query_id().hi);
+ params->query_id.set_lo(state->query_id().lo);
+
+ params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
+ params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
+ params->be_exec_version = state->be_exec_version();
+ params->query_ctx = state->get_query_ctx();
+ return params;
+}
} // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e37883abbe1..e064e6e7610 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -47,6 +47,7 @@ namespace doris {
namespace pipeline {
class PipelineXLocalStateBase;
class PipelineXSinkLocalStateBase;
+class PipelineXFragmentContext;
} // namespace pipeline
class DescriptorTbl;
@@ -74,6 +75,11 @@ public:
const TQueryOptions& query_options, const TQueryGlobals&
query_globals,
ExecEnv* exec_env);
+ // for only use in pipelineX
+ RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId&
instance_id,
+ const TUniqueId& query_id, int32 fragment_id, const
TQueryOptions& query_options,
+ const TQueryGlobals& query_globals, ExecEnv* exec_env);
+
// Used by pipelineX. This runtime state is only used for setup.
RuntimeState(const TUniqueId& query_id, int32 fragment_id, const
TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
@@ -437,7 +443,17 @@ public:
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();
- RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
+ RuntimeFilterMgr* runtime_filter_mgr() {
+ if (_pipeline_x_runtime_filter_mgr) {
+ return _pipeline_x_runtime_filter_mgr;
+ } else {
+ return _runtime_filter_mgr.get();
+ }
+ }
+
+ void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr*
pipeline_x_runtime_filter_mgr) {
+ _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
+ }
void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
@@ -513,6 +529,8 @@ public:
void resize_op_id_to_local_state(int operator_size, int sink_size);
+ auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; }
+
private:
Status create_error_log_file();
@@ -531,6 +549,9 @@ private:
// runtime filter
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
+ // owned by PipelineXFragmentContext
+ RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr;
+
// Protects _data_stream_recvrs_pool
std::mutex _data_stream_recvrs_lock;
@@ -623,10 +644,30 @@ private:
// true if max_filter_ratio is 0
bool _load_zero_tolerance = false;
+ std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;
+
// prohibit copies
RuntimeState(const RuntimeState&);
};
+// from runtime state
+struct RuntimeFilterParamsContext {
+ RuntimeFilterParamsContext() = default;
+ static RuntimeFilterParamsContext* create(RuntimeState* state);
+
+ bool runtime_filter_wait_infinitely;
+ int32_t runtime_filter_wait_time_ms;
+ bool enable_pipeline_exec;
+ int32_t execution_timeout;
+ RuntimeFilterMgr* runtime_filter_mgr;
+ ExecEnv* exec_env;
+ PUniqueId query_id;
+ PUniqueId fragment_instance_id;
+ int be_exec_version;
+ QueryContext* query_ctx;
+ QueryContext* get_query_ctx() const { return query_ctx; }
+};
+
#define RETURN_IF_CANCELLED(state)
\
do {
\
if (UNLIKELY((state)->is_cancelled())) return
Status::Cancelled("Cancelled"); \
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 987a2106894..24152c67089 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -593,10 +593,19 @@ void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
has_cancel_reason
?
PPlanFragmentCancelReason_Name(request->cancel_reason())
: "INTERNAL_ERROR");
-
- _exec_env->fragment_mgr()->cancel_instance(
- tid, has_cancel_reason ? request->cancel_reason()
- :
PPlanFragmentCancelReason::INTERNAL_ERROR);
+ if (request->has_fragment_id()) {
+ TUniqueId query_id;
+ query_id.__set_hi(request->query_id().hi());
+ query_id.__set_lo(request->query_id().lo());
+ _exec_env->fragment_mgr()->cancel_fragment(
+ query_id, request->fragment_id(),
+ has_cancel_reason ? request->cancel_reason()
+ :
PPlanFragmentCancelReason::INTERNAL_ERROR);
+ } else {
+ _exec_env->fragment_mgr()->cancel_instance(
+ tid, has_cancel_reason ? request->cancel_reason()
+ :
PPlanFragmentCancelReason::INTERNAL_ERROR);
+ }
// TODO: the logic seems useless, cancel only return Status::OK.
remove it
st.to_protobuf(result->mutable_status());
diff --git a/be/test/exprs/runtime_filter_test.cpp
b/be/test/exprs/runtime_filter_test.cpp
index b8cce3fbf7d..9739c3930ef 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -103,8 +103,9 @@ IRuntimeFilter*
create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio
}
IRuntimeFilter* runtime_filter = nullptr;
- Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc,
options,
- RuntimeFilterRole::PRODUCER, -1,
&runtime_filter);
+ Status status =
IRuntimeFilter::create(RuntimeFilterParamsContext::create(_runtime_stat),
+ _obj_pool, &desc, options,
RuntimeFilterRole::PRODUCER,
+ -1, &runtime_filter);
EXPECT_TRUE(status.ok()) << status.to_string();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 465bb977226..ecee299f104 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -63,9 +63,14 @@ public class ExecutionProfile {
// Profile for load channels. Only for load job.
private RuntimeProfile loadChannelProfile;
// A countdown latch to mark the completion of each instance.
+ // use for old pipeline
// instance id -> dummy value
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
+ // A countdown latch to mark the completion of each fragment. use for
pipelineX
+ // fragmentId -> dummy value
+ private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
+
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
executionProfile = new RuntimeProfile("Execution Profile " +
DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
@@ -79,7 +84,35 @@ public class ExecutionProfile {
executionProfile.addChild(loadChannelProfile);
}
- public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String>
planNodeMap) {
+ private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String>
planNodeMap) {
+ RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
+ for (int i = 0; i < fragmentProfiles.size(); ++i) {
+ RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
+ RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment "
+ i);
+ fragmentsProfile.addChild(newFragmentProfile);
+ List<RuntimeProfile> allPipelines = new
ArrayList<RuntimeProfile>();
+ for (Pair<RuntimeProfile, Boolean> runtimeProfile :
oldFragmentProfile.getChildList()) {
+ allPipelines.add(runtimeProfile.first);
+ }
+ int pipelineIdx = 0;
+ for (RuntimeProfile pipeline : allPipelines) {
+ List<RuntimeProfile> allPipelineTask = new
ArrayList<RuntimeProfile>();
+ for (Pair<RuntimeProfile, Boolean> runtimeProfile :
pipeline.getChildList()) {
+ allPipelineTask.add(runtimeProfile.first);
+ }
+ RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
+ "Pipeline : " + pipelineIdx + "(instance_num="
+ + allPipelineTask.size() + ")",
+ allPipelines.get(0).nodeId());
+ RuntimeProfile.mergeProfiles(allPipelineTask,
mergedpipelineProfile, planNodeMap);
+ newFragmentProfile.addChild(mergedpipelineProfile);
+ pipelineIdx++;
+ }
+ }
+ return fragmentsProfile;
+ }
+
+ private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer,
String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
@@ -97,6 +130,54 @@ public class ExecutionProfile {
return fragmentsProfile;
}
+ public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String>
planNodeMap) {
+ if (enablePipelineX()) {
+ /*
+ * Fragment 0
+ * ---Pipeline 0
+ * ------pipelineTask 0
+ * ------pipelineTask 0
+ * ------pipelineTask 0
+ * ---Pipeline 1
+ * ------pipelineTask 1
+ * ---Pipeline 2
+ * ------pipelineTask 2
+ * ------pipelineTask 2
+ * Fragment 1
+ * ---Pipeline 0
+ * ------......
+ * ---Pipeline 1
+ * ------......
+ * ---Pipeline 2
+ * ------......
+ * ......
+ */
+ return getPipelineXAggregatedProfile(planNodeMap);
+ } else {
+ /*
+ * Fragment 0
+ * ---Instance 0
+ * ------pipelineTask 0
+ * ------pipelineTask 1
+ * ------pipelineTask 2
+ * ---Instance 1
+ * ------pipelineTask 0
+ * ------pipelineTask 1
+ * ------pipelineTask 2
+ * ---Instance 2
+ * ------pipelineTask 0
+ * ------pipelineTask 1
+ * ------pipelineTask 2
+ * Fragment 1
+ * ---Instance 0
+ * ---Instance 1
+ * ---Instance 2
+ * ......
+ */
+ return getNonPipelineXAggregatedProfile(planNodeMap);
+ }
+ }
+
public RuntimeProfile getExecutionProfile() {
return executionProfile;
}
@@ -120,6 +201,17 @@ public class ExecutionProfile {
}
}
+ private boolean enablePipelineX() {
+ return profileFragmentDoneSignal != null;
+ }
+
+ public void markFragments(int fragments) {
+ profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
+ for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
+ profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is
meaningless */);
+ }
+ }
+
public void update(long startTime, boolean isFinished) {
if (startTime > 0) {
executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS,
TimeUtils.getElapsedTimeMs(startTime));
@@ -133,6 +225,14 @@ public class ExecutionProfile {
}
}
+ if (isFinished && profileFragmentDoneSignal != null) {
+ try {
+ profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e1) {
+ LOG.warn("signal await error", e1);
+ }
+ }
+
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
fragmentProfile.sortChildren();
}
@@ -143,6 +243,9 @@ public class ExecutionProfile {
// count down to zero to notify all objects waiting for this
profileDoneSignal.countDownToZero(new Status());
}
+ if (profileFragmentDoneSignal != null) {
+ profileFragmentDoneSignal.countDownToZero(new Status());
+ }
}
public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
@@ -153,6 +256,14 @@ public class ExecutionProfile {
}
}
+ public void markOneFragmentDone(int fragmentId) {
+ if (profileFragmentDoneSignal != null) {
+ if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
+ LOG.warn("Mark fragment {} done failed", fragmentId);
+ }
+ }
+ }
+
public boolean awaitAllInstancesDone(long waitTimeS) throws
InterruptedException {
if (profileDoneSignal == null) {
return true;
@@ -160,6 +271,13 @@ public class ExecutionProfile {
return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
}
+ public boolean awaitAllFragmentsDone(long waitTimeS) throws
InterruptedException {
+ if (profileFragmentDoneSignal == null) {
+ return true;
+ }
+ return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+ }
+
public boolean isAllInstancesDone() {
if (profileDoneSignal == null) {
return true;
@@ -167,9 +285,16 @@ public class ExecutionProfile {
return profileDoneSignal.getCount() == 0;
}
- public void addInstanceProfile(int instanceIdx, RuntimeProfile
instanceProfile) {
- Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(),
- instanceIdx + " vs. " + fragmentProfiles.size());
- fragmentProfiles.get(instanceIdx).addChild(instanceProfile);
+ public boolean isAllFragmentsDone() {
+ if (profileFragmentDoneSignal == null) {
+ return true;
+ }
+ return profileFragmentDoneSignal.getCount() == 0;
+ }
+
+ public void addInstanceProfile(int fragmentId, RuntimeProfile
instanceProfile) {
+ Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
+ fragmentId + " vs. " + fragmentProfiles.size());
+ fragmentProfiles.get(fragmentId).addChild(instanceProfile);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5188412bd3a..ba1499fae11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -148,6 +148,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -677,7 +678,12 @@ public class Coordinator implements CoordInterface {
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
scanRangeNum);
LOG.info("dispatch load job: {} to {}",
DebugUtil.printId(queryId), addressToBackendID.keySet());
}
- executionProfile.markInstances(instanceIds);
+ if (enablePipelineXEngine) {
+ executionProfile.markFragments(fragments.size());
+ } else {
+ executionProfile.markInstances(instanceIds);
+ }
+
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
@@ -894,7 +900,8 @@ public class Coordinator implements CoordInterface {
Long backendId =
this.addressToBackendID.get(entry.getKey());
PipelineExecContext pipelineExecContext = new
PipelineExecContext(fragment.getFragmentId(),
profileFragmentId, entry.getValue(), backendId,
fragmentInstancesMap,
- executionProfile.getLoadChannelProfile());
+ executionProfile.getLoadChannelProfile(),
this.enablePipelineXEngine,
+ this.executionProfile);
// Each tParam will set the total number of Fragments that
need to be executed on the same BE,
// and the BE will determine whether all Fragments have
been executed based on this information.
// Notice. load fragment has a small probability that
FragmentNumOnHost is 0, for unknown reasons.
@@ -2459,7 +2466,7 @@ public class Coordinator implements CoordInterface {
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (!ctx.updateProfile(params, true)) {
+ if (!ctx.updateProfile(params)) {
return;
}
@@ -2503,16 +2510,14 @@ public class Coordinator implements CoordInterface {
}
Preconditions.checkArgument(params.isSetDetailedReport());
- for (TDetailedReportParams param : params.detailed_report) {
- if
(ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
- LOG.debug("Query {} instance {} is marked done",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
-
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
- }
+ if (ctx.done) {
+ LOG.debug("Query {} fragment {} is marked done",
+ DebugUtil.printId(queryId), ctx.profileFragmentId);
+ executionProfile.markOneFragmentDone(ctx.profileFragmentId);
}
} else if (enablePipelineEngine) {
PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (!ctx.updateProfile(params, false)) {
+ if (!ctx.updateProfile(params)) {
return;
}
@@ -2657,7 +2662,11 @@ public class Coordinator implements CoordInterface {
long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
boolean awaitRes = false;
try {
- awaitRes = executionProfile.awaitAllInstancesDone(waitTime);
+ if (enablePipelineXEngine) {
+ awaitRes =
executionProfile.awaitAllFragmentsDone(waitTime);
+ } else {
+ awaitRes =
executionProfile.awaitAllInstancesDone(waitTime);
+ }
} catch (InterruptedException e) {
// Do nothing
}
@@ -2700,7 +2709,11 @@ public class Coordinator implements CoordInterface {
}
public boolean isDone() {
- return executionProfile.isAllInstancesDone();
+ if (enablePipelineXEngine) {
+ return executionProfile.isAllFragmentsDone();
+ } else {
+ return executionProfile.isAllInstancesDone();
+ }
}
// map from a BE host address to the per-node assigned scan ranges;
@@ -3092,9 +3105,13 @@ public class Coordinator implements CoordInterface {
boolean initiated;
volatile boolean done;
boolean hasCanceled;
+ // use for pipeline
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
+ // use for pipelineX
+ List<RuntimeProfile> taskProfile;
+
+ boolean enablePipelineX;
RuntimeProfile loadChannelProfile;
- int cancelProgress = 0;
int profileFragmentId;
TNetworkAddress brpcAddress;
TNetworkAddress address;
@@ -3103,16 +3120,18 @@ public class Coordinator implements CoordInterface {
long profileReportProgress = 0;
long beProcessEpoch = 0;
private final int numInstances;
+ final ExecutionProfile executionProfile;
public PipelineExecContext(PlanFragmentId fragmentId, int
profileFragmentId,
TPipelineFragmentParams rpcParams, Long backendId,
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
- RuntimeProfile loadChannelProfile) {
+ RuntimeProfile loadChannelProfile, boolean enablePipelineX,
final ExecutionProfile executionProfile) {
this.profileFragmentId = profileFragmentId;
this.fragmentId = fragmentId;
this.rpcParams = rpcParams;
this.numInstances = rpcParams.local_params.size();
this.fragmentInstancesMap = fragmentInstancesMap;
+ this.taskProfile = new ArrayList<RuntimeProfile>();
this.loadChannelProfile = loadChannelProfile;
this.initiated = false;
@@ -3125,12 +3144,27 @@ public class Coordinator implements CoordInterface {
this.hasCanceled = false;
this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
+ this.enablePipelineX = enablePipelineX;
+ this.executionProfile = executionProfile;
+ }
+
+ public Stream<RuntimeProfile> profileStream() {
+ if (enablePipelineX) {
+ return taskProfile.stream();
+ }
+ return fragmentInstancesMap.values().stream();
+ }
+
+ private void attachInstanceProfileToFragmentProfile() {
+ profileStream()
+ .forEach(p ->
executionProfile.addInstanceProfile(this.profileFragmentId, p));
}
/**
* Some information common to all Fragments does not need to be sent
repeatedly.
* Therefore, when we confirm that a certain BE has accepted the
information,
- * we will delete the information in the subsequent Fragment to avoid
repeated sending.
+ * we will delete the information in the subsequent Fragment to avoid
repeated
+ * sending.
* This information can be obtained from the cache of BE.
*/
public void unsetFields() {
@@ -3144,29 +3178,31 @@ public class Coordinator implements CoordInterface {
// update profile.
// return true if profile is updated. Otherwise, return false.
- public synchronized boolean updateProfile(TReportExecStatusParams
params, boolean isPipelineX) {
- if (isPipelineX) {
+ public synchronized boolean updateProfile(TReportExecStatusParams
params) {
+ if (enablePipelineX) {
+ taskProfile.clear();
+ int pipelineIdx = 0;
for (TDetailedReportParams param : params.detailed_report) {
- RuntimeProfile profile =
fragmentInstancesMap.get(param.fragment_instance_id);
- if (params.done && profile.getIsDone()) {
- continue;
- }
-
+ String name = "Pipeline :" + pipelineIdx + " "
+ + " (host=" + address + ")";
+ RuntimeProfile profile = new RuntimeProfile(name);
+ taskProfile.add(profile);
if (param.isSetProfile()) {
profile.update(param.profile);
}
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
- }
if (params.done) {
profile.setIsDone(true);
- profileReportProgress++;
}
+ pipelineIdx++;
}
- if (profileReportProgress == numInstances) {
- this.done = true;
+ if (params.isSetLoadChannelProfile()) {
+ loadChannelProfile.update(params.loadChannelProfile);
}
- return true;
+ this.done = params.done;
+ if (this.done) {
+ attachInstanceProfileToFragmentProfile();
+ }
+ return this.done;
} else {
RuntimeProfile profile =
fragmentInstancesMap.get(params.fragment_instance_id);
if (params.done && profile.getIsDone()) {
@@ -3192,7 +3228,7 @@ public class Coordinator implements CoordInterface {
}
public synchronized void printProfile(StringBuilder builder) {
- this.fragmentInstancesMap.values().stream().forEach(p -> {
+ this.profileStream().forEach(p -> {
p.computeTimeInProfile();
p.prettyPrint(builder, "");
});
@@ -3200,23 +3236,41 @@ public class Coordinator implements CoordInterface {
// cancel all fragment instances.
// return true if cancel success. Otherwise, return false
- public synchronized boolean
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
- if (!this.initiated) {
- LOG.warn("Query {}, ccancel before initiated",
DebugUtil.printId(queryId));
+
+ private synchronized boolean
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
+ if (!this.hasCanceled) {
return false;
}
- // don't cancel if it is already finished
- if (this.done) {
- LOG.warn("Query {}, cancel after finished",
DebugUtil.printId(queryId));
- return false;
+ for (RuntimeProfile profile : taskProfile) {
+ profile.setIsCancel(true);
}
- if (this.hasCanceled) {
- LOG.warn("Query {}, cancel after cancelled",
DebugUtil.printId(queryId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend: {},"
+ + " fragment id={} query={}, reason: {}",
+ this.initiated, this.done, this.hasCanceled,
backend.getId(),
+ this.profileFragmentId,
+ DebugUtil.printId(queryId), cancelReason.name());
+ }
+ try {
+ try {
+
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
+ this.profileFragmentId, queryId, cancelReason);
+ } catch (RpcException e) {
+ LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddress.getHostname(),
+ brpcAddress.getPort());
+
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress),
e.getMessage());
+ }
+ } catch (Exception e) {
+ LOG.warn("catch a exception", e);
return false;
}
+ return true;
+ }
+
+ private synchronized boolean
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
LOG.warn("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend:{},"
- + " fragment instance id={} query={}, reason:
{}",
+ + " fragment instance id={} query={}, reason: {}",
this.initiated, this.done, this.hasCanceled,
backend.getId(),
DebugUtil.printId(localParam.fragment_instance_id),
DebugUtil.printId(queryId), cancelReason.name());
@@ -3244,14 +3298,35 @@ public class Coordinator implements CoordInterface {
if (!this.hasCanceled) {
return false;
}
-
for (int i = 0; i < this.numInstances; i++) {
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
}
- cancelProgress = numInstances;
return true;
}
+ /// TODO: refactor rpcParams
+ public synchronized boolean
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+ if (!this.initiated) {
+ LOG.warn("Query {}, ccancel before initiated",
DebugUtil.printId(queryId));
+ return false;
+ }
+ // don't cancel if it is already finished
+ if (this.done) {
+ LOG.warn("Query {}, cancel after finished",
DebugUtil.printId(queryId));
+ return false;
+ }
+ if (this.hasCanceled) {
+ LOG.warn("Query {}, cancel after cancelled",
DebugUtil.printId(queryId));
+ return false;
+ }
+
+ if (this.enablePipelineX) {
+ return cancelFragment(cancelReason);
+ } else {
+ return cancelInstance(cancelReason);
+ }
+ }
+
public synchronized boolean computeTimeInProfile(int maxFragmentId) {
if (this.profileFragmentId < 0 || this.profileFragmentId >
maxFragmentId) {
LOG.warn("profileFragmentId {} should be in [0, {})",
profileFragmentId, maxFragmentId);
@@ -3843,7 +3918,7 @@ public class Coordinator implements CoordInterface {
private void attachInstanceProfileToFragmentProfile() {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
- ctx.fragmentInstancesMap.values().stream()
+ ctx.profileStream()
.forEach(p ->
executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
}
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 02245c83ced..52350d805bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -240,6 +240,24 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PCancelPlanFragmentResult>
cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
+ int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason
cancelReason) throws RpcException {
+ final InternalService.PCancelPlanFragmentRequest pRequest =
InternalService.PCancelPlanFragmentRequest
+ .newBuilder()
+
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
+ .setCancelReason(cancelReason)
+ .setFragmentId(fragmentId)
+
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.cancelPlanFragmentAsync(pRequest);
+ } catch (Throwable e) {
+ LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(),
+ e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public Future<InternalService.PFetchDataResult> fetchDataAsync(
TNetworkAddress address, InternalService.PFetchDataRequest
request) throws RpcException {
try {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index ec3714d618a..46e2e194f06 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -229,6 +229,8 @@ message PExecPlanFragmentResult {
message PCancelPlanFragmentRequest {
required PUniqueId finst_id = 1;
optional PPlanFragmentCancelReason cancel_reason = 2;
+ optional PUniqueId query_id = 3;
+ optional int32 fragment_id = 4;
};
message PCancelPlanFragmentResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]