This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 606223ab62c Revert "[refactor](pipeline) simplify runtime state ctor
(#25995)" (#26029)
606223ab62c is described below
commit 606223ab62c2408b17fdb019ff824977cd1d14a7
Author: Gabriel <[email protected]>
AuthorDate: Fri Oct 27 18:15:30 2023 +0800
Revert "[refactor](pipeline) simplify runtime state ctor (#25995)" (#26029)
This reverts commit a01922cdc55e2b3a63d9a9aafb38ac5ed64c6dd3.
---
be/src/pipeline/pipeline_fragment_context.cpp | 9 +++------
.../pipeline_x/pipeline_x_fragment_context.cpp | 9 +++------
be/src/runtime/runtime_state.cpp | 21 +++++++++++----------
be/src/runtime/runtime_state.h | 8 +++-----
4 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 38e293367bf..686ff0fe0ad 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -221,12 +221,9 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
local_params.backend_num);
// 1. init _runtime_state
- _runtime_state = RuntimeState::create_unique(
- local_params.fragment_instance_id, request.query_id,
request.fragment_id,
- request.query_options, _query_ctx->query_globals, _exec_env);
- if (local_params.__isset.runtime_filter_params) {
-
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
- }
+ _runtime_state = RuntimeState::create_unique(local_params,
request.query_id,
+ request.fragment_id,
request.query_options,
+ _query_ctx->query_globals,
_exec_env);
_runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 832d86128d2..5ca5829e1a8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -390,12 +390,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
for (size_t i = 0; i < target_size; i++) {
const auto& local_params = request.local_params[i];
- _runtime_states[i] = RuntimeState::create_unique(
- local_params.fragment_instance_id, request.query_id,
request.fragment_id,
- request.query_options, _query_ctx->query_globals, _exec_env);
- if (local_params.__isset.runtime_filter_params) {
-
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
- }
+ _runtime_states[i] = RuntimeState::create_unique(local_params,
request.query_id,
+ request.fragment_id,
request.query_options,
+
_query_ctx->query_globals, _exec_env);
_runtime_states[i]->set_query_ctx(_query_ctx.get());
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_states[i]->set_tracer(_runtime_state->get_tracer());
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d86c0cbb01d..7a24b86621b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -101,10 +101,11 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
DCHECK(status.ok());
}
-RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId&
query_id,
- int32_t fragment_id, const TQueryOptions&
query_options,
- const TQueryGlobals& query_globals, ExecEnv*
exec_env)
- : _profile("Fragment " + print_id(instance_id)),
+RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
+ const TUniqueId& query_id, int32_t fragment_id,
+ const TQueryOptions& query_options, const
TQueryGlobals& query_globals,
+ ExecEnv* exec_env)
+ : _profile("Fragment " +
print_id(pipeline_params.fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
_runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
@@ -124,7 +125,12 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id,
const TUniqueId& query_
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr) {
- DCHECK(init(instance_id, query_options, query_globals, exec_env).ok());
+ if (pipeline_params.__isset.runtime_filter_params) {
+
_runtime_filter_mgr->set_runtime_filter_params(pipeline_params.runtime_filter_params);
+ }
+ Status status =
+ init(pipeline_params.fragment_instance_id, query_options,
query_globals, exec_env);
+ DCHECK(status.ok());
}
RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
@@ -267,11 +273,6 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
return Status::OK();
}
-void RuntimeState::set_runtime_filter_params(
- const TRuntimeFilterParams& runtime_filter_params) const {
- _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params);
-}
-
void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string&
name) {
_query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}",
name, print_id(id)));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f33b4c4febe..29ef581947c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -72,9 +72,9 @@ public:
const TQueryOptions& query_options, const TQueryGlobals&
query_globals,
ExecEnv* exec_env);
- RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id,
int32 fragment_id,
- const TQueryOptions& query_options, const TQueryGlobals&
query_globals,
- ExecEnv* exec_env);
+ RuntimeState(const TPipelineInstanceParams& pipeline_params, const
TUniqueId& query_id,
+ int32 fragment_id, const TQueryOptions& query_options,
+ const TQueryGlobals& query_globals, ExecEnv* exec_env);
// Used by pipelineX. This runtime state is only used for setup.
RuntimeState(const TUniqueId& query_id, int32 fragment_id, const
TQueryOptions& query_options,
@@ -93,8 +93,6 @@ public:
Status init(const TUniqueId& fragment_instance_id, const TQueryOptions&
query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
- void set_runtime_filter_params(const TRuntimeFilterParams&
runtime_filter_params) const;
-
// for ut and non-query.
void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; }
void init_mem_trackers(const TUniqueId& id = TUniqueId(), const
std::string& name = "unknown");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]