This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b4dbb087c0c [Bug](top-n) init query_ctx runtime predicate before 
operators prepare #31876
b4dbb087c0c is described below

commit b4dbb087c0c0dd7e511d92d66efff2f64568bda3
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 6 19:09:10 2024 +0800

    [Bug](top-n) init query_ctx runtime predicate before operators prepare 
#31876
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 733cdfe2b50..daae5feecfe 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -236,6 +236,18 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
                                              
request.bucket_seq_to_instance_idx,
                                              
request.shuffle_idx_to_instance_idx));
     }
+
+    const auto& local_params = request.local_params[0];
+    if (local_params.__isset.runtime_filter_params) {
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                local_params.runtime_filter_params);
+    }
+    if (local_params.__isset.topn_filter_source_node_ids) {
+        
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+    } else {
+        _query_ctx->init_runtime_predicates({0});
+    }
+
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
         pipeline->children().clear();
@@ -523,17 +535,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         // build local_runtime_filter_mgr for each instance
         runtime_filter_mgr =
                 std::make_unique<RuntimeFilterMgr>(request.query_id, 
filterparams.get());
-        if (i == 0) {
-            if (local_params.__isset.runtime_filter_params) {
-                _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                        local_params.runtime_filter_params);
-            }
-            if (local_params.__isset.topn_filter_source_node_ids) {
-                
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
-            } else {
-                _query_ctx->init_runtime_predicates({0});
-            }
-        }
+
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
         _runtime_filter_states.push_back(std::move(filterparams));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to