This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 981ea73466421e909d9ed4001edee4aa7baa1806 Author: Pxl <[email protected]> AuthorDate: Thu Mar 7 18:49:26 2024 +0800 [Bug](top-n) init query_ctx runtime predicate before _build_pipelines (#31896) init query_ctx runtime predicate before _build_pipelines --- be/src/pipeline/pipeline_fragment_context.cpp | 13 ------------- be/src/runtime/fragment_mgr.cpp | 19 ++++++++++++++++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b4c5646402e..6a5bd87e199 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -244,19 +244,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - if (idx == 0) { - if (local_params.__isset.runtime_filter_params) { - if (local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); - } - } - if (local_params.__isset.topn_filter_source_node_ids) { - _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); - } else { - _query_ctx->init_runtime_predicates({0}); - } - } _runtime_state->set_task_execution_context(shared_from_this()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e7d4771d615..cf00ec1086f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -887,10 +887,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, g_fragmentmgr_prepare_latency << (duration_ns / 1000); std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; - static_cast<void>(_runtimefilter_controller.add_entity( + RETURN_IF_ERROR(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - if (i == 0 and handler) { + if (i == 0 && handler) { query_ctx->set_merge_controller_handler(handler); } { @@ -905,6 +905,19 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, int target_size = params.local_params.size(); g_pipeline_fragment_instances_count << target_size; + const auto& local_params = params.local_params[0]; + if (local_params.__isset.runtime_filter_params) { + if (local_params.__isset.runtime_filter_params) { + query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); + } + } + if (local_params.__isset.topn_filter_source_node_ids) { + query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); + } else { + query_ctx->init_runtime_predicates({0}); + } + if (target_size > 1) { int prepare_done = {0}; Status prepare_status[target_size]; @@ -912,7 +925,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::condition_variable cv; for (size_t i = 0; i < target_size; i++) { - static_cast<void>(_thread_pool->submit_func([&, i]() { + RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() { prepare_status[i] = pre_and_submit(i); std::unique_lock<std::mutex> lock(m); prepare_done++; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
