This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 98d05810d16 [opt](nereids)share topnFilterDescs among instances
(#61648)
98d05810d16 is described below
commit 98d05810d16ee47d390532e7725b4bf51e29fed7
Author: minghong <[email protected]>
AuthorDate: Fri Mar 27 19:40:16 2026 +0800
[opt](nereids)share topnFilterDescs among instances (#61648)
Before the fix, TTopnFilterDesc objects were created inside the pipeline
instance loop — once per instance per fragment. With N topn filters and
M total instances across all fragments, this created N×M Thrift objects,
all with identical content.
After the fix, the list is created once per fragment before the loop,
and the same list reference is shared across all instances of that
fragment. Object count drops from O(filters × instances) to O(filters ×
fragments) — typically a 100-1000x reduction for large tables with many
pipeline instances.
---
.../main/java/org/apache/doris/qe/Coordinator.java | 16 ++-
.../java/org/apache/doris/qe/CoordinatorTest.java | 139 +++++++++++++++++++++
2 files changed, 149 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5af6ec65fa4..0ee6ac94560 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3233,6 +3233,14 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
TPlanFragment fragmentThrift = fragment.toThrift();
fragmentThrift.query_cache_param = fragment.queryCacheParam;
+ // Pre-compute topn filter descs once; all instances share the
same data.
+ List<TTopnFilterDesc> topnFilterDescs = null;
+ if (!topnFilters.isEmpty()) {
+ topnFilterDescs = new ArrayList<>();
+ for (TopnFilter filter : topnFilters) {
+ topnFilterDescs.add(filter.toThrift());
+ }
+ }
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges =
instanceExecParam.perNodeScanRanges;
@@ -3304,12 +3312,8 @@ public class Coordinator implements CoordInterface {
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
- if (!topnFilters.isEmpty()) {
- List<TTopnFilterDesc> filterDescs = new ArrayList<>();
- for (TopnFilter filter : topnFilters) {
- filterDescs.add(filter.toThrift());
- }
- localParams.setTopnFilterDescs(filterDescs);
+ if (topnFilterDescs != null) {
+ localParams.setTopnFilterDescs(topnFilterDescs);
}
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf =
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
new file mode 100644
index 00000000000..cc406858366
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineInstanceParams;
+import org.apache.doris.thrift.TTopnFilterDesc;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CoordinatorTest extends TestWithFeService {
+
+ @BeforeAll
+ public void init() throws Exception {
+ FeConstants.runningUnitTest = true;
+ createDatabase("test");
+ useDatabase("test");
+ createTable("create table tbl(id int, v int) distributed by hash(id)"
+ + " buckets 3 properties('replication_num' = '1');");
+ }
+
+ /**
+ * Verifies that FragmentExecParams.toThrift() pre-computes
topnFilterDescs once and shares
+ * the same list object across all TPipelineInstanceParams when
instanceExecParams.size() > 1.
+ *
+ * Before the fix, a new List was created per instance inside the loop.
+ * After the fix, one List is created outside the loop and shared by all
instances.
+ */
+ @Test
+ public void testTopnFilterDescsSharedAmongInstances() throws Exception {
+ NereidsPlanner planner = plan("select * from test.tbl order by id
limit 5");
+ List<TopnFilter> topnFilters = planner.getTopnFilters();
+
+ Assumptions.assumeTrue(!topnFilters.isEmpty(),
+ "Query did not generate topn filters; test skipped");
+
+ Coordinator coordinator = (Coordinator) EnvFactory.getInstance()
+ .createCoordinator(connectContext, planner, null);
+
+ // prepare() populates fragmentExecParamsMap; it is protected and
accessible
+ // from this package.
+ coordinator.prepare();
+
+ // Access private fragmentExecParamsMap via reflection.
+ Field mapField =
Coordinator.class.getDeclaredField("fragmentExecParamsMap");
+ mapField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<PlanFragmentId, Coordinator.FragmentExecParams> fragMap =
+ (Map<PlanFragmentId, Coordinator.FragmentExecParams>)
mapField.get(coordinator);
+
+ Assertions.assertFalse(fragMap.isEmpty());
+
+ // Pick any fragment and inject 3 fake instances on the same host so
that
+ // toThrift() groups them into one TPipelineFragmentParams with 3
local entries.
+ Coordinator.FragmentExecParams fragParams =
fragMap.values().iterator().next();
+ fragParams.instanceExecParams.clear();
+
+ TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(1L, 1L),
host, fragParams));
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(1L, 2L),
host, fragParams));
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(1L, 3L),
host, fragParams));
+
+ // toThrift() is package-private and accessible from this package.
+ Map<TNetworkAddress, TPipelineFragmentParams> result =
fragParams.toThrift(0);
+
+ TPipelineFragmentParams pipelineParams = result.get(host);
+ Assertions.assertNotNull(pipelineParams);
+
+ List<TPipelineInstanceParams> localParamsList =
pipelineParams.getLocalParams();
+ Assertions.assertEquals(3, localParamsList.size());
+
+ // Every instance must have topn_filter_descs set and non-empty.
+ for (TPipelineInstanceParams lp : localParamsList) {
+ Assertions.assertNotNull(lp.getTopnFilterDescs());
+ Assertions.assertFalse(lp.getTopnFilterDescs().isEmpty());
+ }
+
+ // All instances must reference the SAME list object (not merely equal
copies).
+ // This is the invariant introduced by the commit: the list is built
once outside
+ // the instance loop and shared across all instances.
+ List<TTopnFilterDesc> shared =
localParamsList.get(0).getTopnFilterDescs();
+ for (int i = 1; i < localParamsList.size(); i++) {
+ Assertions.assertSame(shared,
localParamsList.get(i).getTopnFilterDescs(),
+ "instance " + i + " must share the same topnFilterDescs
list object");
+ }
+ }
+
+ private NereidsPlanner plan(String sql) throws IOException {
+ connectContext.getSessionVariable().setDisableNereidsRules(
+ "PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
+ connectContext.getSessionVariable().topNLazyMaterializationThreshold =
-1;
+ // The test table is empty (0 rows). The topn-filter condition is:
+ // Math.max(rowCount, 1) * topnFilterRatio > limit
+ // With default ratio=0.5: Math.max(0,1)*0.5=0.5 which is NOT > 5, so
no filter.
+ // Set ratio > 5 so that even an empty table (rowCount=0) passes the
check.
+ connectContext.getSessionVariable().topnFilterRatio = 10.0;
+ connectContext.setThreadLocalInfo();
+ UUID uuid = UUID.randomUUID();
+ connectContext.setQueryId(
+ new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
+ return PlanChecker.from(connectContext).plan(sql);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]