This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 85824c52035 branch-4.0: [opt](nereids)move runtime filter info from
instance level to BE level #56978 (#57047)
85824c52035 is described below
commit 85824c520359b3f0ad972d138b4ac84f9cbce80f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 18:53:35 2025 +0800
branch-4.0: [opt](nereids)move runtime filter info from instance level to
BE level #56978 (#57047)
Cherry-picked from #56978
Co-authored-by: minghong <[email protected]>
---
.../doris/qe/runtime/ThriftPlansBuilder.java | 39 ++++++++++++----------
gensrc/thrift/PaloInternalService.thrift | 4 +--
2 files changed, 23 insertions(+), 20 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 1364d31e879..5fcd14fcb79 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -56,6 +56,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRuntimeFilterInfo;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TTopnFilterDesc;
@@ -116,9 +117,7 @@ public class ThriftPlansBuilder {
workerProcessInstanceNum, coordinatorContext);
TPipelineInstanceParams instanceParam = instanceToThrift(
- currentFragmentParam, instanceJob,
runtimeFiltersThriftBuilder,
- topNFilterThriftSupplier, currentInstanceIndex++
- );
+ currentFragmentParam, instanceJob,
currentInstanceIndex++);
currentFragmentParam.getLocalParams().add(instanceParam);
}
@@ -126,7 +125,8 @@ public class ThriftPlansBuilder {
// so we can merge and send multiple fragment to a backend use one
rpc
for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv :
workerToCurrentFragment.entrySet()) {
TPipelineFragmentParamsList fragments =
fragmentsGroupByWorker.computeIfAbsent(
- kv.getKey(), w -> new TPipelineFragmentParamsList());
+ kv.getKey(), w ->
beToThrift(runtimeFiltersThriftBuilder,
+ topNFilterThriftSupplier));
fragments.addToParamsList(kv.getValue());
}
}
@@ -297,6 +297,22 @@ public class ThriftPlansBuilder {
return destination;
}
+ private static TPipelineFragmentParamsList beToThrift(
+ RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
+ Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
+ TPipelineFragmentParamsList beParam = new
TPipelineFragmentParamsList();
+ TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
+ runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
+
+ // set for runtime filter
+ TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
+
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+ runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
+
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
+ beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+ return beParam;
+ }
+
private static TPipelineFragmentParams fragmentToThriftIfAbsent(
PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
Map<DistributedPlanWorker, TPipelineFragmentParams>
workerToFragmentParams,
@@ -415,26 +431,13 @@ public class ThriftPlansBuilder {
}
private static TPipelineInstanceParams instanceToThrift(
- TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
- RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
- Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int
currentInstanceNum) {
+ TPipelineFragmentParams currentFragmentParam, AssignedJob
instance, int currentInstanceNum) {
TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
instanceParam.setFragmentInstanceId(instance.instanceId());
setScanSourceParam(currentFragmentParam, instance, instanceParam);
instanceParam.setSenderId(instance.indexInUnassignedJob());
instanceParam.setBackendNum(currentInstanceNum);
- instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());
-
- instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());
-
- // set for runtime filter
- TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
-
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
- instanceParam.setRuntimeFilterParams(runtimeFilterParams);
- if
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
-
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
- }
boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
if (isLocalShuffle) {
// a fragment in a backend only enable local shuffle once for the
first local shuffle instance,
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 658ee87d3e4..7f2ac6a8c1c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -393,14 +393,14 @@ struct TQueryOptions {
162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
163: optional bool inverted_index_compatible_read = false
164: optional bool check_orc_init_sargs_success = false
- 165: optional i32 exchange_multi_blocks_byte_size = 262144
+ 165: optional i32 exchange_multi_blocks_byte_size = 262144
// true to use strict cast mode.
166: optional bool enable_strict_cast = false
167: optional bool new_version_unix_timestamp = false
168: optional i32 hnsw_ef_search = 32;
169: optional bool hnsw_check_relative_distance = true;
- 170: optional bool hnsw_bounded_queue = true;
+ 170: optional bool hnsw_bounded_queue = true;
171: optional bool optimize_index_scan_parallelism = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]