This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 398856231f0 [Fix](query cache) support partition-based instance 
parallelism (#60974)
398856231f0 is described below

commit 398856231f07be086ef9f1ea30d968e030f9d149
Author: 924060929 <[email protected]>
AuthorDate: Tue Mar 17 20:16:40 2026 +0800

    [Fix](query cache) support partition-based instance parallelism (#60974)
    
    ### What problem does this PR solve?
    
    When total tablets are much larger than pipeline capacity,
    one-tablet-per-instance planning creates excessive BE concurrency
    pressure in query-cache workloads.
    
    Trigger partition-based planning when:
      total_tablets > parallel_pipeline_task_num * participating_be_num
    
    Before:
      instance_num ~= total_tablets
    After:
      instance_num ~= partitions_on_each_be
    
    Per-BE planning example:
      BE1 tablets: p1[t1,t2], p2[t3]     -> instances: [p1:t1,t2], [p2:t3]
    BE2 tablets: p1[t4], p2[t5,t6] -> instances: [p1:t4], [p2:t5,t6]
    
    This keeps tablets from the same partition in one instance and separates
    different partitions into different instances. If partition mapping is
    incomplete
    or partition planning fails, fallback to default planning for
    correctness.
    
    Tests:
    - partition-based planning path
    - fallback-to-default path (incomplete mapping)
    - non-query-cache default planning path
---
 .../job/UnassignedScanSingleOlapTableJob.java      | 141 ++++++++
 .../job/UnassignedScanSingleOlapTableJobTest.java  | 380 +++++++++++++++++++++
 .../cache/test_partition_instance_query_cache.out  |   6 +
 .../test_partition_instance_query_cache.groovy     | 152 +++++++++
 4 files changed, 679 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index 649e2fa9bb2..fa72f8c0105 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -25,16 +28,27 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TScanRangeParams;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /** UnassignedScanSingleOlapTableJob */
 public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob {
+    private static final Logger LOG = 
LogManager.getLogger(UnassignedScanSingleOlapTableJob.class);
+
     private OlapScanNode olapScanNode;
     private final ScanWorkerSelector scanWorkerSelector;
 
@@ -81,9 +95,136 @@ public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob
         //        instance 5: olapScanNode1: ScanRanges([tablet_10007])
         //    ],
         // }
+        if (usePartitionParallelismForQueryCache(workerToScanRanges, 
distributeContext)) {
+            try {
+                // Best effort optimization for query cache: keep tablets in 
same partition
+                // on the same instance to reduce BE concurrency pressure.
+                List<AssignedJob> partitionInstances = 
insideMachineParallelizationByPartition(workerToScanRanges);
+                if (partitionInstances != null) {
+                    return partitionInstances;
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to assign query cache instances by partition, 
fallback to default planning",
+                        e);
+            }
+        }
+
         return super.insideMachineParallelization(workerToScanRanges, 
inputJobs, distributeContext);
     }
 
+    private List<AssignedJob> insideMachineParallelizationByPartition(
+            Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges) {
+        List<Long> selectedPartitionIds = new 
ArrayList<>(olapScanNode.getSelectedPartitionIds());
+        Map<Long, Long> tabletToPartitionId = 
buildTabletToPartitionId(selectedPartitionIds);
+        if (tabletToPartitionId.size() != 
olapScanNode.getScanTabletIds().size()) {
+            return null;
+        }
+
+        ConnectContext context = statementContext.getConnectContext();
+        List<AssignedJob> instances = new ArrayList<>();
+        for (Map.Entry<DistributedPlanWorker, UninstancedScanSource> entry : 
workerToScanRanges.entrySet()) {
+            DistributedPlanWorker worker = entry.getKey();
+            ScanSource scanSource = entry.getValue().scanSource;
+            if (!(scanSource instanceof DefaultScanSource)) {
+                return null;
+            }
+
+            DefaultScanSource defaultScanSource = (DefaultScanSource) 
scanSource;
+            ScanRanges scanRanges = 
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+            if (scanRanges == null) {
+                return null;
+            }
+            if (scanRanges.params.isEmpty()) {
+                continue;
+            }
+
+            Map<Long, ScanRanges> partitionToScanRanges = 
splitScanRangesByPartition(scanRanges, tabletToPartitionId);
+            if (partitionToScanRanges == null) {
+                return null;
+            }
+
+            // One partition on one BE maps to one instance. Different BEs may 
miss some partitions.
+            for (Long partitionId : selectedPartitionIds) {
+                ScanRanges partitionScanRanges = 
partitionToScanRanges.remove(partitionId);
+                if (partitionScanRanges == null || 
partitionScanRanges.params.isEmpty()) {
+                    continue;
+                }
+                instances.add(assignWorkerAndDataSources(
+                        instances.size(), context.nextInstanceId(), worker,
+                        new DefaultScanSource(ImmutableMap.of(olapScanNode, 
partitionScanRanges))));
+            }
+
+            if (!partitionToScanRanges.isEmpty()) {
+                return null;
+            }
+        }
+        return instances;
+    }
+
+    private boolean usePartitionParallelismForQueryCache(
+            Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges,
+            DistributeContext distributeContext) {
+        if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) {
+            return false;
+        }
+
+        ConnectContext context = statementContext.getConnectContext();
+        if (context == null || 
useLocalShuffleToAddParallel(distributeContext)) {
+            return false;
+        }
+
+        long totalTabletNum = olapScanNode.getScanTabletIds().size();
+        int parallelPipelineTaskNum = Math.max(
+                context.getSessionVariable().getParallelExecInstanceNum(
+                        olapScanNode.getScanContext().getClusterName()), 1);
+        long threshold = (long) parallelPipelineTaskNum * 
workerToScanRanges.size();
+        return totalTabletNum > threshold;
+    }
+
+    private Map<Long, Long> buildTabletToPartitionId(List<Long> 
selectedPartitionIds) {
+        long selectedIndexId = olapScanNode.getSelectedIndexId();
+        if (selectedIndexId == -1) {
+            selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId();
+        }
+
+        Set<Long> scanTabletIds = new 
LinkedHashSet<>(olapScanNode.getScanTabletIds());
+        Map<Long, Long> tabletToPartitionId = new 
LinkedHashMap<>(scanTabletIds.size());
+        for (Long partitionId : selectedPartitionIds) {
+            Partition partition = 
olapScanNode.getOlapTable().getPartition(partitionId);
+            if (partition == null) {
+                continue;
+            }
+            MaterializedIndex index = partition.getIndex(selectedIndexId);
+            if (index == null) {
+                continue;
+            }
+            for (Tablet tablet : index.getTablets()) {
+                long tabletId = tablet.getId();
+                if (scanTabletIds.contains(tabletId)) {
+                    tabletToPartitionId.put(tabletId, partitionId);
+                }
+            }
+        }
+        return tabletToPartitionId;
+    }
+
+    private Map<Long, ScanRanges> splitScanRangesByPartition(
+            ScanRanges scanRanges, Map<Long, Long> tabletToPartitionId) {
+        Map<Long, ScanRanges> partitionToScanRanges = new LinkedHashMap<>();
+        for (int i = 0; i < scanRanges.params.size(); i++) {
+            TScanRangeParams scanRangeParams = scanRanges.params.get(i);
+            long tabletId = 
scanRangeParams.getScanRange().getPaloScanRange().getTabletId();
+            Long partitionId = tabletToPartitionId.get(tabletId);
+            if (partitionId == null) {
+                return null;
+            }
+            partitionToScanRanges
+                    .computeIfAbsent(partitionId, id -> new ScanRanges())
+                    .addScanRange(scanRangeParams, scanRanges.bytes.get(i));
+        }
+        return partitionToScanRanges;
+    }
+
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
new file mode 100644
index 00000000000..097e7930959
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.LocalTablet;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.ScanContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnassignedScanSingleOlapTableJobTest {
+    @Test
+    public void testQueryCacheAssignByPartition() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(1, 1));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long partitionTwo = 200L;
+        long selectedIndexId = 10L;
+        Map<Long, Long> tabletToPartition = ImmutableMap.of(
+                1L, partitionOne,
+                2L, partitionOne,
+                3L, partitionOne,
+                4L, partitionTwo,
+                5L, partitionTwo,
+                6L, partitionTwo
+        );
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(tabletToPartition.keySet()));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets()).thenReturn(ImmutableList.of(
+                tablet(1L), tablet(2L), tablet(3L)
+        ));
+
+        Partition secondPartition = Mockito.mock(Partition.class);
+        MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+        
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+        Mockito.when(secondIndex.getTablets()).thenReturn(ImmutableList.of(
+                tablet(4L), tablet(5L), tablet(6L)
+        ));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        fragment.queryCacheParam = new TQueryCacheParam();
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        // Same partition tablets on one BE should be grouped into one 
instance.
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+        DistributeContext distributeContext = new DistributeContext(
+                Mockito.mock(DistributedPlanWorkerManager.class),
+                true
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                distributeContext, ArrayListMultimap.create());
+
+        Assertions.assertEquals(4, assignedJobs.size());
+
+        Map<Long, Set<Set<Long>>> workerToInstanceTablets = new HashMap<>();
+        for (AssignedJob assignedJob : assignedJobs) {
+            DefaultScanSource defaultScanSource = (DefaultScanSource) 
assignedJob.getScanSource();
+            ScanRanges ranges = 
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+            Set<Long> tabletIds = ranges.params.stream()
+                    .map(param -> 
param.getScanRange().getPaloScanRange().getTabletId())
+                    .collect(Collectors.toCollection(HashSet::new));
+            Set<Long> partitionIds = tabletIds.stream()
+                    .map(tabletToPartition::get)
+                    .collect(Collectors.toSet());
+
+            // Every instance must only contain tablets from one partition.
+            Assertions.assertEquals(1, partitionIds.size());
+
+            workerToInstanceTablets.computeIfAbsent(
+                    assignedJob.getAssignedWorker().id(), k -> new HashSet<>()
+            ).add(tabletIds);
+        }
+
+        Map<Long, Set<Set<Long>>> expected = new HashMap<>();
+        expected.put(1L, new HashSet<>(Arrays.asList(
+                new HashSet<>(Arrays.asList(1L, 2L)),
+                new HashSet<>(Arrays.asList(4L))
+        )));
+        expected.put(2L, new HashSet<>(Arrays.asList(
+                new HashSet<>(Arrays.asList(3L)),
+                new HashSet<>(Arrays.asList(5L, 6L))
+        )));
+
+        // Different partitions are split into different instances on each BE.
+        Assertions.assertEquals(expected, workerToInstanceTablets);
+    }
+
+    @Test
+    public void 
testQueryCacheFallbackToDefaultWhenPartitionMappingIncomplete() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(2, 2));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long selectedIndexId = 10L;
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        // Intentionally miss partitionTwo to trigger fallback.
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(ImmutableList.of(partitionOne));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 
5L, 6L)));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), 
tablet(3L)));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        fragment.queryCacheParam = new TQueryCacheParam();
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                new 
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+                ArrayListMultimap.create());
+
+        // query cache default planning uses one instance per tablet.
+        Assertions.assertEquals(6, assignedJobs.size());
+    }
+
+    @Test
+    public void testNonQueryCacheUseDefaultPlanning() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(3, 3));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long partitionTwo = 200L;
+        long selectedIndexId = 10L;
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 
5L, 6L)));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), 
tablet(3L)));
+
+        Partition secondPartition = Mockito.mock(Partition.class);
+        MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+        
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+        Mockito.when(secondIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(4L), tablet(5L), 
tablet(6L)));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        // No query cache param, must use default planning.
+        fragment.setParallelExecNum(10);
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                new 
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+                ArrayListMultimap.create());
+
+        // default planning splits by tablet count when parallelExecNum is 
large enough.
+        Assertions.assertEquals(6, assignedJobs.size());
+    }
+
+    private static Tablet tablet(long tabletId) {
+        return new LocalTablet(tabletId);
+    }
+
+    private static ScanRanges scanRanges(long... tabletIds) {
+        ScanRanges scanRanges = new ScanRanges();
+        for (long tabletId : tabletIds) {
+            TPaloScanRange paloScanRange = new TPaloScanRange();
+            paloScanRange.setTabletId(tabletId);
+            TScanRange scanRange = new TScanRange();
+            scanRange.setPaloScanRange(paloScanRange);
+            TScanRangeParams scanRangeParams = new TScanRangeParams();
+            scanRangeParams.setScanRange(scanRange);
+            scanRanges.addScanRange(scanRangeParams, 1L);
+        }
+        return scanRanges;
+    }
+
+    private static class TestWorker implements DistributedPlanWorker {
+        private final long id;
+        private final String address;
+
+        private TestWorker(long id, String address) {
+            this.id = id;
+            this.address = address;
+        }
+
+        @Override
+        public long getCatalogId() {
+            return 0;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public String address() {
+            return address;
+        }
+
+        @Override
+        public String host() {
+            return address;
+        }
+
+        @Override
+        public int port() {
+            return 0;
+        }
+
+        @Override
+        public String brpcAddress() {
+            return address;
+        }
+
+        @Override
+        public int brpcPort() {
+            return 0;
+        }
+
+        @Override
+        public boolean available() {
+            return true;
+        }
+    }
+}
diff --git 
a/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out 
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
new file mode 100644
index 00000000000..111f7a196fa
--- /dev/null
+++ 
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !partition_instance_query_result --
+/a     75
+/b     105
+/c     135
+/d     165
diff --git 
a/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
 
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
new file mode 100644
index 00000000000..6d2281e3ef6
--- /dev/null
+++ 
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partition_instance_query_cache") {
+    def tableName = "test_partition_instance"
+    def querySql = """
+        SELECT
+            url,
+            SUM(cost) AS total_cost
+        FROM ${tableName}
+        WHERE dt >= '2026-01-01'
+          AND dt < '2026-01-15'
+        GROUP BY url
+    """
+
+    sql "set enable_nereids_planner=true"
+    sql "set enable_nereids_distribute_planner=true"
+    sql "set enable_query_cache=true"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set enable_sql_cache=false"
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE ${tableName} (
+            dt DATE,
+            user_id INT,
+            url STRING,
+            cost BIGINT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(dt, user_id)
+        PARTITION BY RANGE(dt)
+        (
+            PARTITION p20260101 VALUES LESS THAN ("2026-01-05"),
+            PARTITION p20260105 VALUES LESS THAN ("2026-01-10"),
+            PARTITION p20260110 VALUES LESS THAN ("2026-01-15")
+        )
+        DISTRIBUTED BY HASH(user_id) BUCKETS 3
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${tableName} VALUES
+        ('2026-01-01',1,'/a',10),
+        ('2026-01-01',2,'/b',20),
+        ('2026-01-02',3,'/c',30),
+        ('2026-01-03',4,'/d',40),
+
+        ('2026-01-06',1,'/a',15),
+        ('2026-01-06',2,'/b',25),
+        ('2026-01-07',3,'/c',35),
+        ('2026-01-08',4,'/d',45),
+
+        ('2026-01-11',1,'/a',50),
+        ('2026-01-11',2,'/b',60),
+        ('2026-01-12',3,'/c',70),
+        ('2026-01-13',4,'/d',80)
+    """
+
+    order_qt_partition_instance_query_result """
+        ${querySql}
+        ORDER BY url
+    """
+
+    def normalize = { rows ->
+        return rows.collect { row -> row.collect { col -> String.valueOf(col) 
}.join("|") }.sort()
+    }
+
+    def baseline = normalize(sql(querySql))
+    for (int i = 0; i < 3; i++) {
+        assertEquals(baseline, normalize(sql(querySql)))
+    }
+
+    explain {
+        sql(querySql)
+        contains("DIGEST")
+    }
+
+    def distributedRows = sql("EXPLAIN DISTRIBUTED PLAN ${querySql}")
+    def distributedPlan = distributedRows.collect { it[0].toString() 
}.join("\n")
+    assertTrue(distributedPlan.contains("UnassignedScanSingleOlapTableJob"))
+
+    def partitionMatcher = (distributedPlan =~ /partitions=(\d+)\/(\d+)/)
+    assertTrue(partitionMatcher.find())
+    int partitionCount = Integer.parseInt(partitionMatcher.group(1))
+
+    int scanFragmentBegin = distributedPlan.indexOf("fragmentJob: 
UnassignedScanSingleOlapTableJob")
+    assertTrue(scanFragmentBegin > 0)
+    def scanFragment = distributedPlan.substring(scanFragmentBegin)
+
+    int scanInstanceCount = (scanFragment =~ /StaticAssignedJob\(/).count
+    assertEquals(partitionCount, scanInstanceCount)
+
+    def instanceToTablets = [:].withDefault { [] }
+    String currentInstance = null
+    scanFragment.eachLine { line ->
+        def instanceMatcher = (line =~ /instanceId:\s*([0-9a-f\-]+)/)
+        if (instanceMatcher.find()) {
+            currentInstance = instanceMatcher.group(1)
+            instanceToTablets[currentInstance] = []
+        }
+
+        def tabletMatcher = (line =~ /tablet\s+(\d+)/)
+        if (tabletMatcher.find() && currentInstance != null) {
+            instanceToTablets[currentInstance] << tabletMatcher.group(1)
+        }
+    }
+
+    assertEquals(partitionCount, instanceToTablets.size())
+    instanceToTablets.each { _, tablets ->
+        assertTrue(tablets.size() > 0)
+    }
+
+    def tabletToInstance = [:]
+    instanceToTablets.each { instanceId, tablets ->
+        tablets.each { tabletId ->
+            tabletToInstance[tabletId] = instanceId
+        }
+    }
+
+    ["p20260101", "p20260105", "p20260110"].each { partitionName ->
+        def partitionTabletRows = sql("SHOW TABLETS FROM ${tableName} 
PARTITION(${partitionName})")
+        def partitionTabletIds = partitionTabletRows.collect { 
it[0].toString() }
+        assertTrue(partitionTabletIds.size() > 0)
+
+        partitionTabletIds.each { tabletId ->
+            assertTrue(tabletToInstance.containsKey(tabletId))
+        }
+
+        def partitionInstanceIds = partitionTabletIds.collect { tabletId -> 
tabletToInstance[tabletId] }.toSet()
+        assertEquals(1, partitionInstanceIds.size())
+    }
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to