This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 981ea73466421e909d9ed4001edee4aa7baa1806
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 7 18:49:26 2024 +0800

    [Bug](top-n) init query_ctx runtime predicate before _build_pipelines 
(#31896)
    
    init query_ctx runtime predicate before _build_pipelines
---
 be/src/pipeline/pipeline_fragment_context.cpp | 13 -------------
 be/src/runtime/fragment_mgr.cpp               | 19 ++++++++++++++++---
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b4c5646402e..6a5bd87e199 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -244,19 +244,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state = RuntimeState::create_unique(
             local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
             request.query_options, _query_ctx->query_globals, _exec_env, 
_query_ctx.get());
-    if (idx == 0) {
-        if (local_params.__isset.runtime_filter_params) {
-            if (local_params.__isset.runtime_filter_params) {
-                _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                        local_params.runtime_filter_params);
-            }
-        }
-        if (local_params.__isset.topn_filter_source_node_ids) {
-            
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
-        } else {
-            _query_ctx->init_runtime_predicates({0});
-        }
-    }
 
     _runtime_state->set_task_execution_context(shared_from_this());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e7d4771d615..cf00ec1086f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -887,10 +887,10 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-            static_cast<void>(_runtimefilter_controller.add_entity(
+            RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            if (i == 0 and handler) {
+            if (i == 0 && handler) {
                 query_ctx->set_merge_controller_handler(handler);
             }
             {
@@ -905,6 +905,19 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         int target_size = params.local_params.size();
         g_pipeline_fragment_instances_count << target_size;
 
+        const auto& local_params = params.local_params[0];
+        if (local_params.__isset.runtime_filter_params) {
+            if (local_params.__isset.runtime_filter_params) {
+                query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                        local_params.runtime_filter_params);
+            }
+        }
+        if (local_params.__isset.topn_filter_source_node_ids) {
+            
query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+        } else {
+            query_ctx->init_runtime_predicates({0});
+        }
+
         if (target_size > 1) {
             int prepare_done = {0};
             Status prepare_status[target_size];
@@ -912,7 +925,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             std::condition_variable cv;
 
             for (size_t i = 0; i < target_size; i++) {
-                static_cast<void>(_thread_pool->submit_func([&, i]() {
+                RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() {
                     prepare_status[i] = pre_and_submit(i);
                     std::unique_lock<std::mutex> lock(m);
                     prepare_done++;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to