This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 1126aec3b6a branch-4.1: [Fix](query cache) support partition-based
instance parallelism #60974 (#61438)
1126aec3b6a is described below
commit 1126aec3b6afd6e80262ef15c8eb686f48c5bf1e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Mar 25 14:09:32 2026 +0800
branch-4.1: [Fix](query cache) support partition-based instance parallelism
#60974 (#61438)
Cherry-picked from #60974
Co-authored-by: 924060929 <[email protected]>
---
.../job/UnassignedScanSingleOlapTableJob.java | 141 ++++++++
.../job/UnassignedScanSingleOlapTableJobTest.java | 380 +++++++++++++++++++++
.../cache/test_partition_instance_query_cache.out | 6 +
.../test_partition_instance_query_cache.groovy | 152 +++++++++
4 files changed, 679 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index 649e2fa9bb2..fa72f8c0105 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -17,6 +17,9 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -25,16 +28,27 @@ import
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TScanRangeParams;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/** UnassignedScanSingleOlapTableJob */
public class UnassignedScanSingleOlapTableJob extends
AbstractUnassignedScanJob {
+ private static final Logger LOG =
LogManager.getLogger(UnassignedScanSingleOlapTableJob.class);
+
private OlapScanNode olapScanNode;
private final ScanWorkerSelector scanWorkerSelector;
@@ -81,9 +95,136 @@ public class UnassignedScanSingleOlapTableJob extends
AbstractUnassignedScanJob
// instance 5: olapScanNode1: ScanRanges([tablet_10007])
// ],
// }
+ if (usePartitionParallelismForQueryCache(workerToScanRanges,
distributeContext)) {
+ try {
+ // Best effort optimization for query cache: keep tablets in
same partition
+ // on the same instance to reduce BE concurrency pressure.
+ List<AssignedJob> partitionInstances =
insideMachineParallelizationByPartition(workerToScanRanges);
+ if (partitionInstances != null) {
+ return partitionInstances;
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to assign query cache instances by partition,
fallback to default planning",
+ e);
+ }
+ }
+
return super.insideMachineParallelization(workerToScanRanges,
inputJobs, distributeContext);
}
+ private List<AssignedJob> insideMachineParallelizationByPartition(
+ Map<DistributedPlanWorker, UninstancedScanSource>
workerToScanRanges) {
+ List<Long> selectedPartitionIds = new
ArrayList<>(olapScanNode.getSelectedPartitionIds());
+ Map<Long, Long> tabletToPartitionId =
buildTabletToPartitionId(selectedPartitionIds);
+ if (tabletToPartitionId.size() !=
olapScanNode.getScanTabletIds().size()) {
+ return null;
+ }
+
+ ConnectContext context = statementContext.getConnectContext();
+ List<AssignedJob> instances = new ArrayList<>();
+ for (Map.Entry<DistributedPlanWorker, UninstancedScanSource> entry :
workerToScanRanges.entrySet()) {
+ DistributedPlanWorker worker = entry.getKey();
+ ScanSource scanSource = entry.getValue().scanSource;
+ if (!(scanSource instanceof DefaultScanSource)) {
+ return null;
+ }
+
+ DefaultScanSource defaultScanSource = (DefaultScanSource)
scanSource;
+ ScanRanges scanRanges =
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+ if (scanRanges == null) {
+ return null;
+ }
+ if (scanRanges.params.isEmpty()) {
+ continue;
+ }
+
+ Map<Long, ScanRanges> partitionToScanRanges =
splitScanRangesByPartition(scanRanges, tabletToPartitionId);
+ if (partitionToScanRanges == null) {
+ return null;
+ }
+
+ // One partition on one BE maps to one instance. Different BEs may
miss some partitions.
+ for (Long partitionId : selectedPartitionIds) {
+ ScanRanges partitionScanRanges =
partitionToScanRanges.remove(partitionId);
+ if (partitionScanRanges == null ||
partitionScanRanges.params.isEmpty()) {
+ continue;
+ }
+ instances.add(assignWorkerAndDataSources(
+ instances.size(), context.nextInstanceId(), worker,
+ new DefaultScanSource(ImmutableMap.of(olapScanNode,
partitionScanRanges))));
+ }
+
+ if (!partitionToScanRanges.isEmpty()) {
+ return null;
+ }
+ }
+ return instances;
+ }
+
+ private boolean usePartitionParallelismForQueryCache(
+ Map<DistributedPlanWorker, UninstancedScanSource>
workerToScanRanges,
+ DistributeContext distributeContext) {
+ if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) {
+ return false;
+ }
+
+ ConnectContext context = statementContext.getConnectContext();
+ if (context == null ||
useLocalShuffleToAddParallel(distributeContext)) {
+ return false;
+ }
+
+ long totalTabletNum = olapScanNode.getScanTabletIds().size();
+ int parallelPipelineTaskNum = Math.max(
+ context.getSessionVariable().getParallelExecInstanceNum(
+ olapScanNode.getScanContext().getClusterName()), 1);
+ long threshold = (long) parallelPipelineTaskNum *
workerToScanRanges.size();
+ return totalTabletNum > threshold;
+ }
+
+ private Map<Long, Long> buildTabletToPartitionId(List<Long>
selectedPartitionIds) {
+ long selectedIndexId = olapScanNode.getSelectedIndexId();
+ if (selectedIndexId == -1) {
+ selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId();
+ }
+
+ Set<Long> scanTabletIds = new
LinkedHashSet<>(olapScanNode.getScanTabletIds());
+ Map<Long, Long> tabletToPartitionId = new
LinkedHashMap<>(scanTabletIds.size());
+ for (Long partitionId : selectedPartitionIds) {
+ Partition partition =
olapScanNode.getOlapTable().getPartition(partitionId);
+ if (partition == null) {
+ continue;
+ }
+ MaterializedIndex index = partition.getIndex(selectedIndexId);
+ if (index == null) {
+ continue;
+ }
+ for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
+ if (scanTabletIds.contains(tabletId)) {
+ tabletToPartitionId.put(tabletId, partitionId);
+ }
+ }
+ }
+ return tabletToPartitionId;
+ }
+
+ private Map<Long, ScanRanges> splitScanRangesByPartition(
+ ScanRanges scanRanges, Map<Long, Long> tabletToPartitionId) {
+ Map<Long, ScanRanges> partitionToScanRanges = new LinkedHashMap<>();
+ for (int i = 0; i < scanRanges.params.size(); i++) {
+ TScanRangeParams scanRangeParams = scanRanges.params.get(i);
+ long tabletId =
scanRangeParams.getScanRange().getPaloScanRange().getTabletId();
+ Long partitionId = tabletToPartitionId.get(tabletId);
+ if (partitionId == null) {
+ return null;
+ }
+ partitionToScanRanges
+ .computeIfAbsent(partitionId, id -> new ScanRanges())
+ .addScanRange(scanRangeParams, scanRanges.bytes.get(i));
+ }
+ return partitionToScanRanges;
+ }
+
@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob>
assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
new file mode 100644
index 00000000000..097e7930959
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.LocalTablet;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.ScanContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnassignedScanSingleOlapTableJobTest {
+ @Test
+ public void testQueryCacheAssignByPartition() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(1, 1));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ StatementContext statementContext = new StatementContext(
+ connectContext, new OriginStatement("select * from t", 0));
+ connectContext.setStatementContext(statementContext);
+
+ long partitionOne = 100L;
+ long partitionTwo = 200L;
+ long selectedIndexId = 10L;
+ Map<Long, Long> tabletToPartition = ImmutableMap.of(
+ 1L, partitionOne,
+ 2L, partitionOne,
+ 3L, partitionOne,
+ 4L, partitionTwo,
+ 5L, partitionTwo,
+ 6L, partitionTwo
+ );
+
+ OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+ OlapTable olapTable = Mockito.mock(OlapTable.class);
+ Mockito.when(olapScanNode.getSelectedPartitionIds())
+ .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+ Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+ Mockito.when(olapScanNode.getScanTabletIds())
+ .thenReturn(new ArrayList<>(tabletToPartition.keySet()));
+
+ Partition firstPartition = Mockito.mock(Partition.class);
+ MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+ Mockito.when(firstIndex.getTablets()).thenReturn(ImmutableList.of(
+ tablet(1L), tablet(2L), tablet(3L)
+ ));
+
+ Partition secondPartition = Mockito.mock(Partition.class);
+ MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+ Mockito.when(secondIndex.getTablets()).thenReturn(ImmutableList.of(
+ tablet(4L), tablet(5L), tablet(6L)
+ ));
+
+ PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null,
DataPartition.RANDOM);
+ fragment.queryCacheParam = new TQueryCacheParam();
+
+ DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+ DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+ Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+ = new LinkedHashMap<>();
+ // Same partition tablets on one BE should be grouped into one
instance.
+ workerToScanSources.put(worker1, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+ workerToScanSources.put(worker2, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+ ScanWorkerSelector scanWorkerSelector =
Mockito.mock(ScanWorkerSelector.class);
+ Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+ Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+ )).thenReturn(workerToScanSources);
+
+ UnassignedScanSingleOlapTableJob unassignedJob = new
UnassignedScanSingleOlapTableJob(
+ statementContext,
+ fragment,
+ olapScanNode,
+ ArrayListMultimap.create(),
+ scanWorkerSelector
+ );
+ DistributeContext distributeContext = new DistributeContext(
+ Mockito.mock(DistributedPlanWorkerManager.class),
+ true
+ );
+
+ List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+ distributeContext, ArrayListMultimap.create());
+
+ Assertions.assertEquals(4, assignedJobs.size());
+
+ Map<Long, Set<Set<Long>>> workerToInstanceTablets = new HashMap<>();
+ for (AssignedJob assignedJob : assignedJobs) {
+ DefaultScanSource defaultScanSource = (DefaultScanSource)
assignedJob.getScanSource();
+ ScanRanges ranges =
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+ Set<Long> tabletIds = ranges.params.stream()
+ .map(param ->
param.getScanRange().getPaloScanRange().getTabletId())
+ .collect(Collectors.toCollection(HashSet::new));
+ Set<Long> partitionIds = tabletIds.stream()
+ .map(tabletToPartition::get)
+ .collect(Collectors.toSet());
+
+ // Every instance must only contain tablets from one partition.
+ Assertions.assertEquals(1, partitionIds.size());
+
+ workerToInstanceTablets.computeIfAbsent(
+ assignedJob.getAssignedWorker().id(), k -> new HashSet<>()
+ ).add(tabletIds);
+ }
+
+ Map<Long, Set<Set<Long>>> expected = new HashMap<>();
+ expected.put(1L, new HashSet<>(Arrays.asList(
+ new HashSet<>(Arrays.asList(1L, 2L)),
+ new HashSet<>(Arrays.asList(4L))
+ )));
+ expected.put(2L, new HashSet<>(Arrays.asList(
+ new HashSet<>(Arrays.asList(3L)),
+ new HashSet<>(Arrays.asList(5L, 6L))
+ )));
+
+ // Different partitions are split into different instances on each BE.
+ Assertions.assertEquals(expected, workerToInstanceTablets);
+ }
+
+ @Test
+ public void
testQueryCacheFallbackToDefaultWhenPartitionMappingIncomplete() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(2, 2));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ StatementContext statementContext = new StatementContext(
+ connectContext, new OriginStatement("select * from t", 0));
+ connectContext.setStatementContext(statementContext);
+
+ long partitionOne = 100L;
+ long selectedIndexId = 10L;
+
+ OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+ OlapTable olapTable = Mockito.mock(OlapTable.class);
+ // Intentionally miss partitionTwo to trigger fallback.
+ Mockito.when(olapScanNode.getSelectedPartitionIds())
+ .thenReturn(ImmutableList.of(partitionOne));
+
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+ Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+ Mockito.when(olapScanNode.getScanTabletIds())
+ .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L,
5L, 6L)));
+
+ Partition firstPartition = Mockito.mock(Partition.class);
+ MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+ Mockito.when(firstIndex.getTablets())
+ .thenReturn(ImmutableList.of(tablet(1L), tablet(2L),
tablet(3L)));
+
+ PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null,
DataPartition.RANDOM);
+ fragment.queryCacheParam = new TQueryCacheParam();
+
+ DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+ DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+ Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+ = new LinkedHashMap<>();
+ workerToScanSources.put(worker1, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+ workerToScanSources.put(worker2, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+ ScanWorkerSelector scanWorkerSelector =
Mockito.mock(ScanWorkerSelector.class);
+ Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+ Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+ )).thenReturn(workerToScanSources);
+
+ UnassignedScanSingleOlapTableJob unassignedJob = new
UnassignedScanSingleOlapTableJob(
+ statementContext,
+ fragment,
+ olapScanNode,
+ ArrayListMultimap.create(),
+ scanWorkerSelector
+ );
+
+ List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+ new
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+ ArrayListMultimap.create());
+
+ // query cache default planning uses one instance per tablet.
+ Assertions.assertEquals(6, assignedJobs.size());
+ }
+
+ @Test
+ public void testNonQueryCacheUseDefaultPlanning() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(3, 3));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ StatementContext statementContext = new StatementContext(
+ connectContext, new OriginStatement("select * from t", 0));
+ connectContext.setStatementContext(statementContext);
+
+ long partitionOne = 100L;
+ long partitionTwo = 200L;
+ long selectedIndexId = 10L;
+
+ OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+ OlapTable olapTable = Mockito.mock(OlapTable.class);
+ Mockito.when(olapScanNode.getSelectedPartitionIds())
+ .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+ Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+ Mockito.when(olapScanNode.getScanTabletIds())
+ .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L,
5L, 6L)));
+
+ Partition firstPartition = Mockito.mock(Partition.class);
+ MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+ Mockito.when(firstIndex.getTablets())
+ .thenReturn(ImmutableList.of(tablet(1L), tablet(2L),
tablet(3L)));
+
+ Partition secondPartition = Mockito.mock(Partition.class);
+ MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+ Mockito.when(secondIndex.getTablets())
+ .thenReturn(ImmutableList.of(tablet(4L), tablet(5L),
tablet(6L)));
+
+ PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null,
DataPartition.RANDOM);
+ // No query cache param, must use default planning.
+ fragment.setParallelExecNum(10);
+
+ DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+ DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+ Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+ = new LinkedHashMap<>();
+ workerToScanSources.put(worker1, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+ workerToScanSources.put(worker2, new UninstancedScanSource(new
DefaultScanSource(
+ ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+ ScanWorkerSelector scanWorkerSelector =
Mockito.mock(ScanWorkerSelector.class);
+ Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+ Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+ )).thenReturn(workerToScanSources);
+
+ UnassignedScanSingleOlapTableJob unassignedJob = new
UnassignedScanSingleOlapTableJob(
+ statementContext,
+ fragment,
+ olapScanNode,
+ ArrayListMultimap.create(),
+ scanWorkerSelector
+ );
+
+ List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+ new
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+ ArrayListMultimap.create());
+
+ // default planning splits by tablet count when parallelExecNum is
large enough.
+ Assertions.assertEquals(6, assignedJobs.size());
+ }
+
+ private static Tablet tablet(long tabletId) {
+ return new LocalTablet(tabletId);
+ }
+
+ private static ScanRanges scanRanges(long... tabletIds) {
+ ScanRanges scanRanges = new ScanRanges();
+ for (long tabletId : tabletIds) {
+ TPaloScanRange paloScanRange = new TPaloScanRange();
+ paloScanRange.setTabletId(tabletId);
+ TScanRange scanRange = new TScanRange();
+ scanRange.setPaloScanRange(paloScanRange);
+ TScanRangeParams scanRangeParams = new TScanRangeParams();
+ scanRangeParams.setScanRange(scanRange);
+ scanRanges.addScanRange(scanRangeParams, 1L);
+ }
+ return scanRanges;
+ }
+
+ private static class TestWorker implements DistributedPlanWorker {
+ private final long id;
+ private final String address;
+
+ private TestWorker(long id, String address) {
+ this.id = id;
+ this.address = address;
+ }
+
+ @Override
+ public long getCatalogId() {
+ return 0;
+ }
+
+ @Override
+ public long id() {
+ return id;
+ }
+
+ @Override
+ public String address() {
+ return address;
+ }
+
+ @Override
+ public String host() {
+ return address;
+ }
+
+ @Override
+ public int port() {
+ return 0;
+ }
+
+ @Override
+ public String brpcAddress() {
+ return address;
+ }
+
+ @Override
+ public int brpcPort() {
+ return 0;
+ }
+
+ @Override
+ public boolean available() {
+ return true;
+ }
+ }
+}
diff --git
a/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
new file mode 100644
index 00000000000..111f7a196fa
--- /dev/null
+++
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !partition_instance_query_result --
+/a 75
+/b 105
+/c 135
+/d 165
diff --git
a/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
new file mode 100644
index 00000000000..6d2281e3ef6
--- /dev/null
+++
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partition_instance_query_cache") {
+ def tableName = "test_partition_instance"
+ def querySql = """
+ SELECT
+ url,
+ SUM(cost) AS total_cost
+ FROM ${tableName}
+ WHERE dt >= '2026-01-01'
+ AND dt < '2026-01-15'
+ GROUP BY url
+ """
+
+ sql "set enable_nereids_planner=true"
+ sql "set enable_nereids_distribute_planner=true"
+ sql "set enable_query_cache=true"
+ sql "set parallel_pipeline_task_num=3"
+ sql "set enable_sql_cache=false"
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ dt DATE,
+ user_id INT,
+ url STRING,
+ cost BIGINT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(dt, user_id)
+ PARTITION BY RANGE(dt)
+ (
+ PARTITION p20260101 VALUES LESS THAN ("2026-01-05"),
+ PARTITION p20260105 VALUES LESS THAN ("2026-01-10"),
+ PARTITION p20260110 VALUES LESS THAN ("2026-01-15")
+ )
+ DISTRIBUTED BY HASH(user_id) BUCKETS 3
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName} VALUES
+ ('2026-01-01',1,'/a',10),
+ ('2026-01-01',2,'/b',20),
+ ('2026-01-02',3,'/c',30),
+ ('2026-01-03',4,'/d',40),
+
+ ('2026-01-06',1,'/a',15),
+ ('2026-01-06',2,'/b',25),
+ ('2026-01-07',3,'/c',35),
+ ('2026-01-08',4,'/d',45),
+
+ ('2026-01-11',1,'/a',50),
+ ('2026-01-11',2,'/b',60),
+ ('2026-01-12',3,'/c',70),
+ ('2026-01-13',4,'/d',80)
+ """
+
+ order_qt_partition_instance_query_result """
+ ${querySql}
+ ORDER BY url
+ """
+
+ def normalize = { rows ->
+ return rows.collect { row -> row.collect { col -> String.valueOf(col)
}.join("|") }.sort()
+ }
+
+ def baseline = normalize(sql(querySql))
+ for (int i = 0; i < 3; i++) {
+ assertEquals(baseline, normalize(sql(querySql)))
+ }
+
+ explain {
+ sql(querySql)
+ contains("DIGEST")
+ }
+
+ def distributedRows = sql("EXPLAIN DISTRIBUTED PLAN ${querySql}")
+ def distributedPlan = distributedRows.collect { it[0].toString()
}.join("\n")
+ assertTrue(distributedPlan.contains("UnassignedScanSingleOlapTableJob"))
+
+ def partitionMatcher = (distributedPlan =~ /partitions=(\d+)\/(\d+)/)
+ assertTrue(partitionMatcher.find())
+ int partitionCount = Integer.parseInt(partitionMatcher.group(1))
+
+ int scanFragmentBegin = distributedPlan.indexOf("fragmentJob:
UnassignedScanSingleOlapTableJob")
+ assertTrue(scanFragmentBegin > 0)
+ def scanFragment = distributedPlan.substring(scanFragmentBegin)
+
+ int scanInstanceCount = (scanFragment =~ /StaticAssignedJob\(/).count
+ assertEquals(partitionCount, scanInstanceCount)
+
+ def instanceToTablets = [:].withDefault { [] }
+ String currentInstance = null
+ scanFragment.eachLine { line ->
+ def instanceMatcher = (line =~ /instanceId:\s*([0-9a-f\-]+)/)
+ if (instanceMatcher.find()) {
+ currentInstance = instanceMatcher.group(1)
+ instanceToTablets[currentInstance] = []
+ }
+
+ def tabletMatcher = (line =~ /tablet\s+(\d+)/)
+ if (tabletMatcher.find() && currentInstance != null) {
+ instanceToTablets[currentInstance] << tabletMatcher.group(1)
+ }
+ }
+
+ assertEquals(partitionCount, instanceToTablets.size())
+ instanceToTablets.each { _, tablets ->
+ assertTrue(tablets.size() > 0)
+ }
+
+ def tabletToInstance = [:]
+ instanceToTablets.each { instanceId, tablets ->
+ tablets.each { tabletId ->
+ tabletToInstance[tabletId] = instanceId
+ }
+ }
+
+ ["p20260101", "p20260105", "p20260110"].each { partitionName ->
+ def partitionTabletRows = sql("SHOW TABLETS FROM ${tableName}
PARTITION(${partitionName})")
+ def partitionTabletIds = partitionTabletRows.collect {
it[0].toString() }
+ assertTrue(partitionTabletIds.size() > 0)
+
+ partitionTabletIds.each { tabletId ->
+ assertTrue(tabletToInstance.containsKey(tabletId))
+ }
+
+ def partitionInstanceIds = partitionTabletIds.collect { tabletId ->
tabletToInstance[tabletId] }.toSet()
+ assertEquals(1, partitionInstanceIds.size())
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]