This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e05ceb52f36 branch-4.0: [fix](nereids)set RuntimeFilterInfo only on BE
which is merge node #57108 (#57199)
e05ceb52f36 is described below
commit e05ceb52f3649768d060695b754671b0d55c105f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 13:08:20 2025 +0800
branch-4.0: [fix](nereids)set RuntimeFilterInfo only on BE which is merge
node #57108 (#57199)
Cherry-picked from #57108
Co-authored-by: minghong <[email protected]>
---
.../trees/plans/distribute/worker/BackendWorker.java | 2 +-
.../doris/qe/runtime/RuntimeFiltersThriftBuilder.java | 15 ++++++---------
.../org/apache/doris/qe/runtime/ThriftPlansBuilder.java | 11 ++++++++---
gensrc/thrift/PaloInternalService.thrift | 1 +
4 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
index 63c73b50edc..e76934cf847 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
@@ -45,7 +45,7 @@ public class BackendWorker implements DistributedPlanWorker {
@Override
public String brpcAddress() {
- return backend.getHost() + brpcPort();
+ return backend.getHost() + ":" + brpcPort();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
index f9ab8e83f07..fca5461fe1c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
@@ -65,7 +65,7 @@ public class RuntimeFiltersThriftBuilder {
return mergeInstance == instance;
}
- public void setRuntimeFilterThriftParams(TRuntimeFilterParams
runtimeFilterParams) {
+ public void populateRuntimeFilterParams(TRuntimeFilterParams
runtimeFilterParams) {
for (RuntimeFilter rf : runtimeFilters) {
List<RuntimeFilterTarget> targets =
ridToTargets.get(rf.getFilterId());
if (targets == null) {
@@ -89,8 +89,7 @@ public class RuntimeFiltersThriftBuilder {
}
runtimeFilterParams.putToRidToTargetParamv2(
- rf.getFilterId().asInt(), new
ArrayList<>(targetToParams.values())
- );
+ rf.getFilterId().asInt(), new
ArrayList<>(targetToParams.values()));
}
}
for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
@@ -122,15 +121,14 @@ public class RuntimeFiltersThriftBuilder {
PlanFragment fragment = plan.getFragmentJob().getFragment();
// Transform <fragment, runtimeFilterId> to <runtimeFilterId,
fragment>
for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
- List<RuntimeFilterTarget> targetFragments =
- ridToTargetParam.computeIfAbsent(rid, k -> new
ArrayList<>());
+ List<RuntimeFilterTarget> targetFragments =
ridToTargetParam.computeIfAbsent(rid,
+ k -> new ArrayList<>());
for (AssignedJob instanceJob : plan.getInstanceJobs()) {
BackendWorker backendWorker = (BackendWorker)
instanceJob.getAssignedWorker();
Backend backend = backendWorker.getBackend();
targetFragments.add(new RuntimeFilterTarget(
fragment.getFragmentId().asInt(),
- new TNetworkAddress(backend.getHost(),
backend.getBrpcPort())
- ));
+ new TNetworkAddress(backend.getHost(),
backend.getBrpcPort())));
}
}
@@ -146,8 +144,7 @@ public class RuntimeFiltersThriftBuilder {
}
return new RuntimeFiltersThriftBuilder(
mergeAddress, runtimeFilters, mergeInstance,
- broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum
- );
+ broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum);
}
public static class RuntimeFilterTarget {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 5fcd14fcb79..394e0cd5b1c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -125,7 +125,7 @@ public class ThriftPlansBuilder {
// so we can merge and send multiple fragment to a backend use one
rpc
for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv :
workerToCurrentFragment.entrySet()) {
TPipelineFragmentParamsList fragments =
fragmentsGroupByWorker.computeIfAbsent(
- kv.getKey(), w ->
beToThrift(runtimeFiltersThriftBuilder,
+ kv.getKey(), w -> beToThrift(kv.getKey(),
runtimeFiltersThriftBuilder,
topNFilterThriftSupplier));
fragments.addToParamsList(kv.getValue());
}
@@ -298,18 +298,23 @@ public class ThriftPlansBuilder {
}
private static TPipelineFragmentParamsList beToThrift(
+ DistributedPlanWorker worker,
RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
TPipelineFragmentParamsList beParam = new
TPipelineFragmentParamsList();
TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
- // set for runtime filter
TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+ if
(worker.host().equals(runtimeFiltersThriftBuilder.mergeAddress.getHostname())
+ && worker.brpcPort() ==
runtimeFiltersThriftBuilder.mergeAddress.getPort()) {
+ // only set following information for merge BE node
+
runtimeFiltersThriftBuilder.populateRuntimeFilterParams(runtimeFilterParams);
+ }
runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
-
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+
return beParam;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index f9490fce799..a5a1de2a300 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -441,6 +441,7 @@ struct TRuntimeFilterParams {
// Runtime filter merge instance address. Used if this filter has a remote
target
1: optional Types.TNetworkAddress runtime_filter_merge_addr
+ // keep 2/3/4/5 unset if BE is not used for merge
// deprecated
2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]