This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98d05810d16 [opt](nereids)share topnFilterDescs among instances 
(#61648)
98d05810d16 is described below

commit 98d05810d16ee47d390532e7725b4bf51e29fed7
Author: minghong <[email protected]>
AuthorDate: Fri Mar 27 19:40:16 2026 +0800

    [opt](nereids)share topnFilterDescs among instances (#61648)
    
    Before the fix, TTopnFilterDesc objects were created inside the pipeline
    instance loop — once per instance per fragment. With N topn filters and
    M total instances across all fragments, this created N×M Thrift objects,
    all with identical content.
    
    After the fix, the list is created once per fragment before the loop,
    and the same list reference is shared across all instances of that
    fragment. Object count drops from O(filters × instances) to O(filters ×
    fragments) — typically a 100-1000x reduction for large tables with many
    pipeline instances.
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  16 ++-
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 139 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5af6ec65fa4..0ee6ac94560 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3233,6 +3233,14 @@ public class Coordinator implements CoordInterface {
             Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
             TPlanFragment fragmentThrift = fragment.toThrift();
             fragmentThrift.query_cache_param = fragment.queryCacheParam;
+            // Pre-compute topn filter descs once; all instances share the 
same data.
+            List<TTopnFilterDesc> topnFilterDescs = null;
+            if (!topnFilters.isEmpty()) {
+                topnFilterDescs = new ArrayList<>();
+                for (TopnFilter filter : topnFilters) {
+                    topnFilterDescs.add(filter.toThrift());
+                }
+            }
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
@@ -3304,12 +3312,8 @@ public class Coordinator implements CoordInterface {
                 localParams.setBackendNum(backendNum++);
                 localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
                 
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
-                if (!topnFilters.isEmpty()) {
-                    List<TTopnFilterDesc> filterDescs = new ArrayList<>();
-                    for (TopnFilter filter : topnFilters) {
-                        filterDescs.add(filter.toThrift());
-                    }
-                    localParams.setTopnFilterDescs(filterDescs);
+                if (topnFilterDescs != null) {
+                    localParams.setTopnFilterDescs(topnFilterDescs);
                 }
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
                     Set<Integer> broadCastRf = 
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
new file mode 100644
index 00000000000..cc406858366
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineInstanceParams;
+import org.apache.doris.thrift.TTopnFilterDesc;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CoordinatorTest extends TestWithFeService {
+
+    @BeforeAll
+    public void init() throws Exception {
+        FeConstants.runningUnitTest = true;
+        createDatabase("test");
+        useDatabase("test");
+        createTable("create table tbl(id int, v int) distributed by hash(id)"
+                + " buckets 3 properties('replication_num' = '1');");
+    }
+
+    /**
+     * Verifies that FragmentExecParams.toThrift() pre-computes 
topnFilterDescs once and shares
+     * the same list object across all TPipelineInstanceParams when 
instanceExecParams.size() > 1.
+     *
+     * Before the fix, a new List was created per instance inside the loop.
+     * After the fix, one List is created outside the loop and shared by all 
instances.
+     */
+    @Test
+    public void testTopnFilterDescsSharedAmongInstances() throws Exception {
+        NereidsPlanner planner = plan("select * from test.tbl order by id 
limit 5");
+        List<TopnFilter> topnFilters = planner.getTopnFilters();
+
+        Assumptions.assumeTrue(!topnFilters.isEmpty(),
+                "Query did not generate topn filters; test skipped");
+
+        Coordinator coordinator = (Coordinator) EnvFactory.getInstance()
+                .createCoordinator(connectContext, planner, null);
+
+        // prepare() populates fragmentExecParamsMap; it is protected and 
accessible
+        // from this package.
+        coordinator.prepare();
+
+        // Access private fragmentExecParamsMap via reflection.
+        Field mapField = 
Coordinator.class.getDeclaredField("fragmentExecParamsMap");
+        mapField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        Map<PlanFragmentId, Coordinator.FragmentExecParams> fragMap =
+                (Map<PlanFragmentId, Coordinator.FragmentExecParams>) 
mapField.get(coordinator);
+
+        Assertions.assertFalse(fragMap.isEmpty());
+
+        // Pick any fragment and inject 3 fake instances on the same host so 
that
+        // toThrift() groups them into one TPipelineFragmentParams with 3 
local entries.
+        Coordinator.FragmentExecParams fragParams = 
fragMap.values().iterator().next();
+        fragParams.instanceExecParams.clear();
+
+        TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 1L), 
host, fragParams));
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 2L), 
host, fragParams));
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 3L), 
host, fragParams));
+
+        // toThrift() is package-private and accessible from this package.
+        Map<TNetworkAddress, TPipelineFragmentParams> result = 
fragParams.toThrift(0);
+
+        TPipelineFragmentParams pipelineParams = result.get(host);
+        Assertions.assertNotNull(pipelineParams);
+
+        List<TPipelineInstanceParams> localParamsList = 
pipelineParams.getLocalParams();
+        Assertions.assertEquals(3, localParamsList.size());
+
+        // Every instance must have topn_filter_descs set and non-empty.
+        for (TPipelineInstanceParams lp : localParamsList) {
+            Assertions.assertNotNull(lp.getTopnFilterDescs());
+            Assertions.assertFalse(lp.getTopnFilterDescs().isEmpty());
+        }
+
+        // All instances must reference the SAME list object (not merely equal 
copies).
+        // This is the invariant introduced by the commit: the list is built 
once outside
+        // the instance loop and shared across all instances.
+        List<TTopnFilterDesc> shared = 
localParamsList.get(0).getTopnFilterDescs();
+        for (int i = 1; i < localParamsList.size(); i++) {
+            Assertions.assertSame(shared, 
localParamsList.get(i).getTopnFilterDescs(),
+                    "instance " + i + " must share the same topnFilterDescs 
list object");
+        }
+    }
+
+    private NereidsPlanner plan(String sql) throws IOException {
+        connectContext.getSessionVariable().setDisableNereidsRules(
+                "PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
+        connectContext.getSessionVariable().topNLazyMaterializationThreshold = 
-1;
+        // The test table is empty (0 rows). The topn-filter condition is:
+        //   Math.max(rowCount, 1) * topnFilterRatio > limit
+        // With default ratio=0.5: Math.max(0,1)*0.5=0.5 which is NOT > 5, so 
no filter.
+        // Set ratio > 5 so that even an empty table (rowCount=0) passes the 
check.
+        connectContext.getSessionVariable().topnFilterRatio = 10.0;
+        connectContext.setThreadLocalInfo();
+        UUID uuid = UUID.randomUUID();
+        connectContext.setQueryId(
+                new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()));
+        return PlanChecker.from(connectContext).plan(sql);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to