This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 85824c52035 branch-4.0: [opt](nereids)move runtime filter info from 
instance level to BE level #56978 (#57047)
85824c52035 is described below

commit 85824c520359b3f0ad972d138b4ac84f9cbce80f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 18:53:35 2025 +0800

    branch-4.0: [opt](nereids)move runtime filter info from instance level to 
BE level #56978 (#57047)
    
    Cherry-picked from #56978
    
    Co-authored-by: minghong <[email protected]>
---
 .../doris/qe/runtime/ThriftPlansBuilder.java       | 39 ++++++++++++----------
 gensrc/thrift/PaloInternalService.thrift           |  4 +--
 2 files changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 1364d31e879..5fcd14fcb79 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -56,6 +56,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
 import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TPlanFragmentDestination;
 import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRuntimeFilterInfo;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TTopnFilterDesc;
@@ -116,9 +117,7 @@ public class ThriftPlansBuilder {
                         workerProcessInstanceNum, coordinatorContext);
 
                 TPipelineInstanceParams instanceParam = instanceToThrift(
-                        currentFragmentParam, instanceJob, 
runtimeFiltersThriftBuilder,
-                        topNFilterThriftSupplier, currentInstanceIndex++
-                );
+                        currentFragmentParam, instanceJob, 
currentInstanceIndex++);
                 currentFragmentParam.getLocalParams().add(instanceParam);
             }
 
@@ -126,7 +125,8 @@ public class ThriftPlansBuilder {
             // so we can merge and send multiple fragment to a backend use one 
rpc
             for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : 
workerToCurrentFragment.entrySet()) {
                 TPipelineFragmentParamsList fragments = 
fragmentsGroupByWorker.computeIfAbsent(
-                        kv.getKey(), w -> new TPipelineFragmentParamsList());
+                        kv.getKey(), w -> 
beToThrift(runtimeFiltersThriftBuilder,
+                                topNFilterThriftSupplier));
                 fragments.addToParamsList(kv.getValue());
             }
         }
@@ -297,6 +297,22 @@ public class ThriftPlansBuilder {
         return destination;
     }
 
+    private static TPipelineFragmentParamsList beToThrift(
+            RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
+            Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
+        TPipelineFragmentParamsList beParam = new 
TPipelineFragmentParamsList();
+        TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
+        runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
+
+        // set for runtime filter
+        TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
+        
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+        runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
+        
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
+        beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+        return beParam;
+    }
+
     private static TPipelineFragmentParams fragmentToThriftIfAbsent(
             PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
             Map<DistributedPlanWorker, TPipelineFragmentParams> 
workerToFragmentParams,
@@ -415,26 +431,13 @@ public class ThriftPlansBuilder {
     }
 
     private static TPipelineInstanceParams instanceToThrift(
-            TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
-            RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
-            Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int 
currentInstanceNum) {
+            TPipelineFragmentParams currentFragmentParam, AssignedJob 
instance, int currentInstanceNum) {
         TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
         instanceParam.setFragmentInstanceId(instance.instanceId());
         setScanSourceParam(currentFragmentParam, instance, instanceParam);
 
         instanceParam.setSenderId(instance.indexInUnassignedJob());
         instanceParam.setBackendNum(currentInstanceNum);
-        instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());
-
-        instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());
-
-        // set for runtime filter
-        TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
-        
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
-        instanceParam.setRuntimeFilterParams(runtimeFilterParams);
-        if 
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
-            
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
-        }
         boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
         if (isLocalShuffle) {
             // a fragment in a backend only enable local shuffle once for the 
first local shuffle instance,
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 658ee87d3e4..7f2ac6a8c1c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -393,14 +393,14 @@ struct TQueryOptions {
   162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
   163: optional bool inverted_index_compatible_read = false
   164: optional bool check_orc_init_sargs_success = false
-  165: optional i32 exchange_multi_blocks_byte_size = 262144 
+  165: optional i32 exchange_multi_blocks_byte_size = 262144
   // true to use strict cast mode.
   166: optional bool enable_strict_cast = false
   167: optional bool new_version_unix_timestamp = false
 
   168: optional i32 hnsw_ef_search = 32;
   169: optional bool hnsw_check_relative_distance = true;
-  170: optional bool hnsw_bounded_queue = true; 
+  170: optional bool hnsw_bounded_queue = true;
 
   171: optional bool optimize_index_scan_parallelism = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to