This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 625afc83165 branch-4.1: [opt](nereids)share topnFilterDescs among 
instances #61648 (#64279)
625afc83165 is described below

commit 625afc831655235345c19aef9bb9f9961e4445a4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 16 19:22:16 2026 +0800

    branch-4.1: [opt](nereids)share topnFilterDescs among instances #61648 
(#64279)
    
    Cherry-picked from #61648
    
    Co-authored-by: minghong <[email protected]>
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  16 ++-
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 139 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f55581076a1..5465c24d2a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3262,6 +3262,14 @@ public class Coordinator implements CoordInterface {
             Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
             TPlanFragment fragmentThrift = fragment.toThrift();
             fragmentThrift.query_cache_param = fragment.queryCacheParam;
+            // Pre-compute topn filter descs once; all instances share the 
same data.
+            List<TTopnFilterDesc> topnFilterDescs = null;
+            if (!topnFilters.isEmpty()) {
+                topnFilterDescs = new ArrayList<>();
+                for (TopnFilter filter : topnFilters) {
+                    topnFilterDescs.add(filter.toThrift());
+                }
+            }
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
@@ -3340,12 +3348,8 @@ public class Coordinator implements CoordInterface {
                 localParams.setBackendNum(backendNum++);
                 localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
                 
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
-                if (!topnFilters.isEmpty()) {
-                    List<TTopnFilterDesc> filterDescs = new ArrayList<>();
-                    for (TopnFilter filter : topnFilters) {
-                        filterDescs.add(filter.toThrift());
-                    }
-                    localParams.setTopnFilterDescs(filterDescs);
+                if (topnFilterDescs != null) {
+                    localParams.setTopnFilterDescs(topnFilterDescs);
                 }
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
                     Set<Integer> broadCastRf = 
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
new file mode 100644
index 00000000000..cc406858366
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineInstanceParams;
+import org.apache.doris.thrift.TTopnFilterDesc;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CoordinatorTest extends TestWithFeService {
+
+    @BeforeAll
+    public void init() throws Exception {
+        FeConstants.runningUnitTest = true;
+        createDatabase("test");
+        useDatabase("test");
+        createTable("create table tbl(id int, v int) distributed by hash(id)"
+                + " buckets 3 properties('replication_num' = '1');");
+    }
+
+    /**
+     * Verifies that FragmentExecParams.toThrift() pre-computes 
topnFilterDescs once and shares
+     * the same list object across all TPipelineInstanceParams when 
instanceExecParams.size() > 1.
+     *
+     * Before the fix, a new List was created per instance inside the loop.
+     * After the fix, one List is created outside the loop and shared by all 
instances.
+     */
+    @Test
+    public void testTopnFilterDescsSharedAmongInstances() throws Exception {
+        NereidsPlanner planner = plan("select * from test.tbl order by id 
limit 5");
+        List<TopnFilter> topnFilters = planner.getTopnFilters();
+
+        Assumptions.assumeTrue(!topnFilters.isEmpty(),
+                "Query did not generate topn filters; test skipped");
+
+        Coordinator coordinator = (Coordinator) EnvFactory.getInstance()
+                .createCoordinator(connectContext, planner, null);
+
+        // prepare() populates fragmentExecParamsMap; it is protected and 
accessible
+        // from this package.
+        coordinator.prepare();
+
+        // Access private fragmentExecParamsMap via reflection.
+        Field mapField = 
Coordinator.class.getDeclaredField("fragmentExecParamsMap");
+        mapField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        Map<PlanFragmentId, Coordinator.FragmentExecParams> fragMap =
+                (Map<PlanFragmentId, Coordinator.FragmentExecParams>) 
mapField.get(coordinator);
+
+        Assertions.assertFalse(fragMap.isEmpty());
+
+        // Pick any fragment and inject 3 fake instances on the same host so 
that
+        // toThrift() groups them into one TPipelineFragmentParams with 3 
local entries.
+        Coordinator.FragmentExecParams fragParams = 
fragMap.values().iterator().next();
+        fragParams.instanceExecParams.clear();
+
+        TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 1L), 
host, fragParams));
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 2L), 
host, fragParams));
+        fragParams.instanceExecParams.add(
+                new Coordinator.FInstanceExecParam(new TUniqueId(1L, 3L), 
host, fragParams));
+
+        // toThrift() is package-private and accessible from this package.
+        Map<TNetworkAddress, TPipelineFragmentParams> result = 
fragParams.toThrift(0);
+
+        TPipelineFragmentParams pipelineParams = result.get(host);
+        Assertions.assertNotNull(pipelineParams);
+
+        List<TPipelineInstanceParams> localParamsList = 
pipelineParams.getLocalParams();
+        Assertions.assertEquals(3, localParamsList.size());
+
+        // Every instance must have topn_filter_descs set and non-empty.
+        for (TPipelineInstanceParams lp : localParamsList) {
+            Assertions.assertNotNull(lp.getTopnFilterDescs());
+            Assertions.assertFalse(lp.getTopnFilterDescs().isEmpty());
+        }
+
+        // All instances must reference the SAME list object (not merely equal 
copies).
+        // This is the invariant introduced by the commit: the list is built 
once outside
+        // the instance loop and shared across all instances.
+        List<TTopnFilterDesc> shared = 
localParamsList.get(0).getTopnFilterDescs();
+        for (int i = 1; i < localParamsList.size(); i++) {
+            Assertions.assertSame(shared, 
localParamsList.get(i).getTopnFilterDescs(),
+                    "instance " + i + " must share the same topnFilterDescs 
list object");
+        }
+    }
+
+    private NereidsPlanner plan(String sql) throws IOException {
+        connectContext.getSessionVariable().setDisableNereidsRules(
+                "PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
+        connectContext.getSessionVariable().topNLazyMaterializationThreshold = 
-1;
+        // The test table is empty (0 rows). The topn-filter condition is:
+        //   Math.max(rowCount, 1) * topnFilterRatio > limit
+        // With default ratio=0.5: Math.max(0,1)*0.5=0.5 which is NOT > 5, so 
no filter.
+        // Set ratio > 5 so that even an empty table (rowCount=0) passes the 
check.
+        connectContext.getSessionVariable().topnFilterRatio = 10.0;
+        connectContext.setThreadLocalInfo();
+        UUID uuid = UUID.randomUUID();
+        connectContext.setQueryId(
+                new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()));
+        return PlanChecker.from(connectContext).plan(sql);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to