This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2913a111f60 [Bug](top-n) init query_ctx runtime predicate before
_build_pipelines #31895
2913a111f60 is described below
commit 2913a111f601ef44fb73a862a00d4d3aa2a5bb6b
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 7 10:19:42 2024 +0800
[Bug](top-n) init query_ctx runtime predicate before _build_pipelines #31895
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index daae5feecfe..1b717ec3c3b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -208,6 +208,17 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
+ const auto& local_params = request.local_params[0];
+ if (local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
+ }
+ if (local_params.__isset.topn_filter_source_node_ids) {
+
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+ } else {
+ _query_ctx->init_runtime_predicates({0});
+ }
+
_need_local_merge =
request.__isset.parallel_instances &&
(request.__isset.per_node_shared_scans &&
!request.per_node_shared_scans.empty());
@@ -237,17 +248,6 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
request.shuffle_idx_to_instance_idx));
}
- const auto& local_params = request.local_params[0];
- if (local_params.__isset.runtime_filter_params) {
- _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
- local_params.runtime_filter_params);
- }
- if (local_params.__isset.topn_filter_source_node_ids) {
-
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
- } else {
- _query_ctx->init_runtime_predicates({0});
- }
-
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
pipeline->children().clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]