This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b4dbb087c0c [Bug](top-n) init query_ctx runtime predicate before
operators prepare #31876
b4dbb087c0c is described below
commit b4dbb087c0c0dd7e511d92d66efff2f64568bda3
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 6 19:09:10 2024 +0800
[Bug](top-n) init query_ctx runtime predicate before operators prepare
#31876
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 733cdfe2b50..daae5feecfe 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -236,6 +236,18 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}
+
+ const auto& local_params = request.local_params[0];
+ if (local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
+ }
+ if (local_params.__isset.topn_filter_source_node_ids) {
+
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+ } else {
+ _query_ctx->init_runtime_predicates({0});
+ }
+
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
pipeline->children().clear();
@@ -523,17 +535,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
// build local_runtime_filter_mgr for each instance
runtime_filter_mgr =
std::make_unique<RuntimeFilterMgr>(request.query_id,
filterparams.get());
- if (i == 0) {
- if (local_params.__isset.runtime_filter_params) {
- _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
- local_params.runtime_filter_params);
- }
- if (local_params.__isset.topn_filter_source_node_ids) {
-
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
- } else {
- _query_ctx->init_runtime_predicates({0});
- }
- }
+
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
_runtime_filter_states.push_back(std::move(filterparams));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]