This is an automated email from the ASF dual-hosted git repository.

morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4c708cd7f [chore](distribute plan) add Javadoc to 
UnassignedJob.computeAssignedJobs and related methods (#63743)
4c4c708cd7f is described below

commit 4c4c708cd7f422e49a0c32a15a4649fee72e8a1b
Author: morrySnow <[email protected]>
AuthorDate: Tue Jun 9 00:21:55 2026 +0800

    [chore](distribute plan) add Javadoc to UnassignedJob.computeAssignedJobs 
and related methods (#63743)
    
    ## Summary
    - Add comprehensive Javadoc to `UnassignedJob.computeAssignedJobs()` and
    all its overrides across 10 concrete subclasses
    - Document the two-phase parallelization strategy in
    `AbstractUnassignedScanJob` (`multipleMachinesParallelization` +
    `insideMachineParallelization`)
    - Add Javadoc to all protected helper methods: `degreeOfParallelism`,
    `assignLocalShuffleJobs`, `assignedDefaultJobs`,
    `useLocalShuffleToAddParallel`, `fillUpAssignedJobs`,
    `fillUpSingleEmptyInstance`
    - Each subclass override explains its specific logic (colocate/bucket
    join, query cache partition optimization, missing bucket fill-up, etc.)
---
 .../worker/job/AbstractUnassignedScanJob.java      | 132 +++++++++++++++++++++
 .../distribute/worker/job/UnassignedAllBEJob.java  |  18 +++
 .../distribute/worker/job/UnassignedGatherJob.java |  12 ++
 .../UnassignedGatherScanMultiRemoteTablesJob.java  |  10 ++
 .../worker/job/UnassignedGroupCommitJob.java       |   9 ++
 .../plans/distribute/worker/job/UnassignedJob.java |  16 +++
 .../worker/job/UnassignedLocalTVFSinkJob.java      |  11 ++
 .../worker/job/UnassignedQueryConstantJob.java     |   9 ++
 .../job/UnassignedScanBucketOlapTableJob.java      |  67 +++++++++++
 .../worker/job/UnassignedScanMetadataJob.java      |  19 +++
 .../job/UnassignedScanSingleOlapTableJob.java      |  35 ++++++
 .../job/UnassignedScanSingleRemoteTableJob.java    |  20 ++++
 .../worker/job/UnassignedShuffleJob.java           |  16 +++
 .../worker/job/UnassignedSpecifyInstancesJob.java  |  11 ++
 14 files changed, 385 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index 167ea3dc334..8c8a47e8444 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -47,6 +47,24 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         super(statementContext, fragment, scanNodes, exchangeToChildJob);
     }
 
+    /**
+     * Compute assigned scan jobs using a two-phase parallelization strategy:
+     * <ol>
+     *   <li><b>Cross-machine parallelization</b> ({@link 
#multipleMachinesParallelization}):
+     *       For each tablet / scan range, select the best replica and its 
hosting backend worker.
+     *       This groups scan ranges by the worker that will process them.</li>
+     *   <li><b>Intra-machine parallelization</b> ({@link 
#insideMachineParallelization}):
+     *       Within each worker, split the assigned scan ranges into one or 
more instances
+     *       based on the degree of parallelism. Supports local shuffle mode 
to further
+     *       increase parallelism without rescanning data.</li>
+     * </ol>
+     * After both phases, {@link #fillUpAssignedJobs} provides a hook for 
subclasses to
+     * supply fallback instances when no workers could be selected (e.g. all 
tablets pruned).
+     *
+     * @param distributeContext the distribute context for worker selection 
and parallelism config
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return the list of assigned scan jobs, each bound to a worker with its 
tablet ranges
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
@@ -59,6 +77,18 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         return fillUpAssignedJobs(assignedJobs, 
distributeContext.workerManager, inputJobs);
     }
 
+    /**
+     * Hook for subclasses to supply fallback instances when the normal 
parallelization
+     * produces an empty result. For example, when all tablets of a table have 
been pruned
+     * (e.g. TABLET(1234) with a non-existent tablet id), this method can 
create a single
+     * empty instance to keep the fragment alive and return an empty result 
set.
+     *
+     * @param assignedJobs the list produced by {@link 
#insideMachineParallelization};
+     *                     may be empty if no workers could be selected
+     * @param workerManager the worker manager used to select a random 
fallback worker
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return the (possibly augmented) list of assigned jobs; default returns 
unchanged
+     */
     protected List<AssignedJob> fillUpAssignedJobs(
             List<AssignedJob> assignedJobs,
             DistributedPlanWorkerManager workerManager,
@@ -66,9 +96,44 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         return assignedJobs;
     }
 
+    /**
+     * Cross-machine parallelization: for each tablet / scan range of the scan 
nodes
+     * in this fragment, select the best replica and its hosting {@link 
DistributedPlanWorker}.
+     * The result groups all scan ranges by the worker that will process them.
+     * <p>
+     * This is the first phase of the two-phase parallelization. The returned 
map drives
+     * the second phase ({@link #insideMachineParallelization}) where each 
worker's ranges
+     * are further split into individual instances.
+     *
+     * @param distributeContext the distribute context for worker selection 
and parallelism config
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a map from selected worker to its {@link UninstancedScanSource} 
containing
+     *         the raw scan ranges assigned to that worker, not yet split into 
instances
+     */
     protected abstract Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs);
 
+    /**
+     * Intra-machine parallelization: for each worker, split its assigned scan 
ranges
+     * into one or more {@link AssignedJob} instances. This is the second 
phase of
+     * the two-phase parallelization, following {@link 
#multipleMachinesParallelization}.
+     * <p>
+     * For each worker entry, the method:
+     * <ol>
+     *   <li>Computes the max parallelism from the scan source (e.g. tablet 
count).</li>
+     *   <li>Determines the final instance count via {@link 
#degreeOfParallelism},
+     *       capped by the fragment's {@code parallelExecNum} and tablet 
count.</li>
+     *   <li>Splits scan ranges evenly across instances (default mode) or 
creates
+     *       local shuffle instances that share a single scan source to add
+     *       parallelism without rescanning data ({@link 
#assignLocalShuffleJobs}).</li>
+     * </ol>
+     *
+     * @param workerToScanRanges map from worker to its un-instanced scan 
ranges,
+     *                           produced by {@link 
#multipleMachinesParallelization}
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @param distributeContext the distribute context for parallelism 
configuration
+     * @return the list of assigned jobs, each bound to a worker with its 
portion of scan ranges
+     */
     protected List<AssignedJob> insideMachineParallelization(
             Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges,
             ListMultimap<ExchangeNode, AssignedJob> inputJobs,
@@ -104,10 +169,33 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         return instances;
     }
 
+    /**
+     * Whether the fragment should use a serial source operator followed by 
local
+     * shuffle to add intra-machine parallelism. When true, data is first 
gathered
+     * through one exchange, then locally shuffled to multiple instances on 
the same
+     * machine, allowing parallel computation without rescanning the source 
data.
+     *
+     * @param distributeContext the distribute context; for load jobs, the 
connect
+     *                          context is passed as null to avoid serial 
source
+     * @return true if the fragment has a serial source operator and should use
+     *         local shuffle to increase parallelism
+     */
     protected boolean useLocalShuffleToAddParallel(DistributeContext 
distributeContext) {
         return fragment.useSerialSource(distributeContext.isLoadJob ? null : 
statementContext.getConnectContext());
     }
 
+    /**
+     * Split the given scan source evenly into {@code instanceNum} partitions 
and
+     * create one {@link StaticAssignedJob} per partition, all on the same 
worker.
+     * Each instance scans a disjoint subset of the tablet ranges, dividing the
+     * total scan workload among the instances.
+     *
+     * @param scanSource the full scan source (e.g. all tablets assigned to 
this worker)
+     * @param instanceNum the number of instances to split into
+     * @param instances the output list receiving newly created assigned jobs
+     * @param context the connect context for generating instance IDs
+     * @param worker the worker that will host all of the instances
+     */
     protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, 
List<AssignedJob> instances,
             ConnectContext context, DistributedPlanWorker worker) {
         // split the scanRanges to some partitions, one partition for one 
instance
@@ -127,6 +215,22 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         }
     }
 
+    /**
+     * Create local shuffle instances on the given worker. The first instance 
scans
+     * all data, and remaining instances receive an empty scan source — they 
share
+     * the first instance's scan result via local shuffle on the same BE.
+     * This avoids rescanning the same data multiple times while still adding
+     * parallelism for downstream operators (e.g. aggregation).
+     * <p>
+     * All instances share the same {@code shareScanId}, signaling to the 
backend
+     * that they belong to the same shared-scan group.
+     *
+     * @param scanSource the full scan source (all data for this worker)
+     * @param instanceNum the total number of local shuffle instances to create
+     * @param instances the output list receiving newly created {@link 
LocalShuffleAssignedJob}s
+     * @param context the connect context for generating instance IDs
+     * @param worker the worker that will host all local shuffle instances
+     */
     protected void assignLocalShuffleJobs(ScanSource scanSource, int 
instanceNum, List<AssignedJob> instances,
             ConnectContext context, DistributedPlanWorker worker) {
         // only generate one instance to scan all data, in this step
@@ -161,6 +265,25 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         }
     }
 
+    /**
+     * Compute the number of parallel instances for this fragment.
+     * The result is bounded by several constraints:
+     * <ul>
+     *   <li>If the fragment has unpartitioned data distribution, returns 
1.</li>
+     *   <li>If query cache is enabled, returns {@code maxParallel} (one 
instance per
+     *       tablet required for cache lookup).</li>
+     *   <li>If the single OLAP scan node qualifies for single-instance 
optimization
+     *       (e.g. LIMIT with no conjuncts), returns 1 to save resources.</li>
+     *   <li>If local shuffle is active, returns the fragment's {@code 
parallelExecNum}.</li>
+     *   <li>Otherwise, returns {@code min(maxParallel, max(parallelExecNum, 
1))},
+     *       i.e. capped by the actual tablet count.</li>
+     * </ul>
+     *
+     * @param maxParallel the maximum possible parallelism (e.g. total tablet 
count
+     *                    or bucket count on this worker)
+     * @param useLocalShuffleToAddParallel whether local shuffle is active
+     * @return the number of instances to create for this worker
+     */
     protected int degreeOfParallelism(int maxParallel, boolean 
useLocalShuffleToAddParallel) {
         Preconditions.checkArgument(maxParallel > 0, "maxParallel must be 
positive");
         if (!fragment.getDataPartition().isPartitioned()) {
@@ -188,6 +311,15 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 
1));
     }
 
+    /**
+     * Create a single empty instance assigned to a random available worker.
+     * Used by subclasses in {@link #fillUpAssignedJobs} as a fallback when 
normal
+     * parallelization produces no instances (e.g. all tablets/data pruned 
away),
+     * ensuring the fragment can still execute and return an empty result.
+     *
+     * @param workerManager the worker manager to select a random worker from
+     * @return a singleton list containing one empty assigned job
+     */
     protected List<AssignedJob> 
fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
         long catalogId = Env.getCurrentInternalCatalog().getId();
         if (scanNodes != null && scanNodes.size() > 0) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
index e8b30730103..fb3e1a07526 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
@@ -56,6 +56,24 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
     }
 
     // ExchangeNode -> upstreamFragment -> AssignedJob(instances of 
upstreamFragment)
+    /**
+     * Compute assigned jobs that deploy one instance on every available 
backend.
+     * This is used for dictionary sink fragments where data must be loaded 
onto
+     * all BEs. Supports two loading modes:
+     * <ul>
+     *   <li><b>Full load</b>: when source data version has changed, redeploy 
to all BEs
+     *       with parallelism matching the upstream fragment instance 
count.</li>
+     *   <li><b>Partial load</b>: when only some BEs are outdated, deploy only 
to those
+     *       outdated BEs to avoid redundant work.</li>
+     * </ul>
+     * Each BE gets one instance with an empty {@link DefaultScanSource} (the 
actual
+     * scan data comes from the upstream exchange).
+     *
+     * @param distributeContext the distribute context providing the worker 
manager
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs,
+     *                  used to determine the expected instance count for full 
loads
+     * @return one assigned job per target backend
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(DistributeContext 
distributeContext,
             ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
index bd1f2779acc..3cb1b9ec858 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
@@ -40,6 +40,18 @@ public class UnassignedGatherJob extends 
AbstractUnassignedJob {
         super(statementContext, fragment, ImmutableList.of(), 
exchangeToChildJob);
     }
 
+    /**
+     * Compute assigned jobs for a gather (single-node) fragment.
+     * All instances are placed on a single randomly selected worker.
+     * When {@code useSerialSource} is true, multiple local shuffle instances
+     * are created on the same worker to add intra-machine parallelism:
+     * the first instance scans all data from the upstream exchange and
+     * local-shuffles it to the other local instances for parallel processing.
+     *
+     * @param distributeContext the distribute context for worker selection
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return one or more assigned jobs, all on the same selected worker
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
index f3d260e289d..7045ceb9748 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
@@ -61,6 +61,16 @@ public class UnassignedGatherScanMultiRemoteTablesJob 
extends AbstractUnassigned
         return true;
     }
 
+    /**
+     * Compute a single assigned job that gathers scan ranges from all
+     * {@link org.apache.doris.planner.DataGenScanNode} sources in this 
fragment.
+     * All scan ranges from each DataGenScanNode are collected into one
+     * {@link DefaultScanSource} and placed on a randomly selected worker.
+     *
+     * @param distributeContext the distribute context for worker selection
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a list containing exactly one assigned job with all scan ranges 
merged
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
index d4f32cce896..ca29f0a7689 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
@@ -39,6 +39,15 @@ public class UnassignedGroupCommitJob extends 
AbstractUnassignedJob {
         super(statementContext, fragment, scanNodes, exchangeToChildJob);
     }
 
+    /**
+     * Compute a single assigned job bound to the group commit merge backend.
+     * The target backend is determined by {@link 
StatementContext#getGroupCommitMergeBackend()},
+     * ensuring the group commit sink executes on the specific BE designated 
for merging.
+     *
+     * @param distributeContext the distribute context (unused — worker is 
fixed by group commit logic)
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a list containing exactly one assigned job on the group commit 
merge backend
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
index a5d6331440a..2830b1d1d9c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
@@ -43,6 +43,22 @@ public interface UnassignedJob extends 
TreeNode<UnassignedJob> {
 
     ListMultimap<ExchangeNode, UnassignedJob> getExchangeToChildJob();
 
+    /**
+     * Compute and return the list of {@link AssignedJob}s for this fragment.
+     * This is the core method that transforms an unassigned fragment-level 
job into
+     * concrete parallel instances, each bound to a specific {@link 
DistributedPlanWorker}
+     * and carrying its assigned {@link ScanSource} (data ranges).
+     *
+     * @param distributeContext
+     *         the distribute context containing worker manager, selected 
workers, and other
+     *         planner state needed for worker selection and parallelism 
decisions
+     * @param inputJobs
+     *         multimap from child {@link ExchangeNode} to their 
already-assigned jobs;
+     *         provides the child fragment instance layout used by 
shuffle/gather jobs
+     *         to determine their own instance count and worker placement
+     * @return the list of assigned jobs, each representing one fragment 
instance scheduled
+     *         on a specific worker with its data source
+     */
     List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
index abe804dc170..a9b259de3f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
@@ -48,6 +48,17 @@ public class UnassignedLocalTVFSinkJob extends 
AbstractUnassignedJob {
         this.backendId = backendId;
     }
 
+    /**
+     * Compute a single assigned job on the designated backend for local TVF 
sink.
+     * The target backend is determined by {@code backendId}. If the specified 
backend
+     * is not alive, an {@link IllegalStateException} is thrown. This ensures
+     * INSERT INTO local(...) writes to the correct node's local disk.
+     *
+     * @param distributeContext the distribute context (unused — worker is 
fixed by backendId)
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a list containing exactly one assigned job on the designated 
backend
+     * @throws IllegalStateException if the target backend is not available
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
index 4c9fb15a2b7..735760553c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
@@ -37,6 +37,15 @@ public class UnassignedQueryConstantJob extends 
AbstractUnassignedJob {
         super(statementContext, fragment, ImmutableList.of(), 
ArrayListMultimap.create());
     }
 
+    /**
+     * Compute a single assigned job on a randomly selected worker for 
constant queries
+     * (e.g. SELECT 1, SELECT * FROM VALUES(...)). Such queries have no data 
scan,
+     * so a single instance with an empty {@link DefaultScanSource} suffices.
+     *
+     * @param distributeContext the distribute context for random worker 
selection
+     * @param inputJobs unused — constant queries have no child fragments
+     * @return a list containing exactly one assigned job on a random worker
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index b6a450c93a1..c4a08a339ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -86,6 +86,18 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         return olapScanNodes;
     }
 
+    /**
+     * Select a replica and its hosting worker for each bucket's tablets 
across all
+     * OLAP scan nodes in this fragment, grouping by bucket index. This is the 
key
+     * mechanism for bucket-shuffle join and colocate join: tablets belonging 
to the
+     * same bucket index across different tables are co-located on the same 
worker,
+     * enabling local join without data shuffling.
+     *
+     * @param distributeContext the distribute context
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a map from worker to its {@link UninstancedScanSource} keyed by 
bucket index,
+     *         e.g. {@code {BackendWorker("172.0.0.1") -> {bucket 0: 
{olapScanNode1: [...], olapScanNode2: [...]}}}}
+     */
     @Override
     protected Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
@@ -112,6 +124,23 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         );
     }
 
+    /**
+     * Split each worker's assigned buckets into one or more instances, then 
fill up
+     * missing bucket instances when needed for outer join or non-intersect 
set operations
+     * in bucket-shuffle mode.
+     * <p>
+     * After the default even-split from {@link 
AbstractUnassignedScanJob#insideMachineParallelization},
+     * this method checks whether the fragment contains right outer join, full 
outer join,
+     * semi/anti join, or non-intersect set operations that use bucket 
shuffle. If a bucket
+     * index has no left-side data (e.g. due to tablet pruning), a placeholder 
instance is
+     * created for that bucket so the right-side data still has a destination 
to be shuffled to,
+     * preventing the join from silently dropping rows.
+     *
+     * @param workerToScanRanges map from worker to its un-instanced bucket 
ranges
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @param distributeContext the distribute context for parallelism 
configuration
+     * @return the list of assigned jobs, with missing bucket placeholders 
filled in
+     */
     @Override
     protected List<AssignedJob> insideMachineParallelization(
             Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges,
@@ -177,6 +206,29 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         return assignedJobs;
     }
 
+    /**
+     * Creates local shuffle instances for bucket-based join fragments.
+     * Handles two scenarios:
+     * <ol>
+     *   <li><b>All serial</b>: all scan nodes use serial source. Only the 
first
+     *       instance scans, others share via local shuffle. Each instance is
+     *       assigned a subset of bucket indexes for join processing.</li>
+     *   <li><b>Mixed serial/non-serial</b>: some scan nodes are serial (e.g.
+     *       multi-partition table) and some are not. The serial scan source is
+     *       merged into the first instance, while non-serial sources are
+     *       parallelized normally. All instances use
+     *       {@link LocalShuffleBucketJoinAssignedJob} which carries the 
specific
+     *       bucket indexes to join.</li>
+     * </ol>
+     * Any remaining slots (when {@code instanceNum} exceeds the number of
+     * bucket groups) are filled with empty instances that have no join 
buckets.
+     *
+     * @param scanSource the bucket scan source to distribute
+     * @param instanceNum the target number of instances for this worker
+     * @param instances the output list receiving newly created assigned jobs
+     * @param context the connect context for generating instance IDs
+     * @param worker the worker that will host all instances
+     */
     @Override
     protected void assignLocalShuffleJobs(ScanSource scanSource, int 
instanceNum, List<AssignedJob> instances,
             ConnectContext context, DistributedPlanWorker worker) {
@@ -506,6 +558,21 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         return workers;
     }
 
+    /**
+     * Compute parallelism for bucket-based scan fragments.
+     * In addition to the base class constraints, this override introduces a
+     * tablet-count-based strategy for pure colocate scan (no exchange nodes):
+     * parallelism is derived from the total tablet count, capped by the
+     * session variable {@code colocateMaxParallelNum} (default 128).
+     * <p>
+     * When exchange nodes are present (e.g. bucket shuffle join), falls back
+     * to {@link AbstractUnassignedScanJob#degreeOfParallelism} to avoid
+     * over-parallelizing the join fragment.
+     *
+     * @param maxParallel the maximum possible parallelism (bucket count)
+     * @param useLocalShuffleToAddParallel whether local shuffle is active
+     * @return the number of instances to create per worker
+     */
     @Override
     protected int degreeOfParallelism(int maxParallel, boolean 
useLocalShuffleToAddParallel) {
         Preconditions.checkArgument(maxParallel > 0, "maxParallel must be 
positive");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
index aab0f20895d..8ff7092300f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
@@ -48,6 +48,15 @@ public class UnassignedScanMetadataJob extends 
AbstractUnassignedScanJob {
         this.schemaScanNode = schemaScanNode;
     }
 
+    /**
+     * Select a worker for the schema metadata scan node (e.g. 
information_schema tables).
+     * Metadata scans are typically lightweight and produce a single scan 
range per node;
+     * this method distributes them across available workers for load 
balancing.
+     *
+     * @param distributeContext the distribute context
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a map from worker to its assigned schema scan ranges
+     */
     @Override
     protected Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
@@ -56,6 +65,16 @@ public class UnassignedScanMetadataJob extends 
AbstractUnassignedScanJob {
         );
     }
 
+    /**
+     * If no workers could be selected for the metadata scan (e.g. all 
backends are
+     * unavailable), create a single empty instance on a random available 
worker
+     * as a fallback to prevent query failure.
+     *
+     * @param assignedJobs the list produced by {@link 
#insideMachineParallelization}
+     * @param workerManager the worker manager to select a fallback worker from
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return the original list if non-empty, otherwise a single empty 
instance
+     */
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index fa72f8c0105..f6d1694d855 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -62,6 +62,16 @@ public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob
         this.olapScanNode = olapScanNode;
     }
 
+    /**
+     * Select a replica and its hosting worker for every tablet of the OLAP 
scan node,
+     * without bucket awareness. Each tablet is assigned to the best available 
backend
+     * holding a replica, and tablets on the same backend are grouped together.
+     *
+     * @param distributeContext the distribute context
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a map from worker to its assigned tablets (as {@link 
UninstancedScanSource}),
+     *         e.g. {@code {BackendWorker("172.0.0.1") -> 
[tablet_10001..10004]}}
+     */
     @Override
     protected Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
@@ -78,6 +88,20 @@ public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob
         );
     }
 
+    /**
+     * For each worker, split its assigned tablets into one or more instances.
+     * When the fragment uses query cache and the tablet count exceeds the 
threshold,
+     * a partition-based grouping strategy is attempted first: tablets 
belonging to
+     * the same partition are kept within the same instance to reduce backend
+     * concurrency pressure during cache lookup. If partition-based grouping 
is not
+     * applicable or fails, falls back to the default even-split strategy from
+     * {@link AbstractUnassignedScanJob#insideMachineParallelization}.
+     *
+     * @param workerToScanRanges map from worker to its un-instanced tablet 
ranges
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @param distributeContext the distribute context for parallelism 
configuration
+     * @return the list of assigned jobs, each bound to a worker with its 
tablet portion
+     */
     @Override
     protected List<AssignedJob> insideMachineParallelization(
             Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges,
@@ -225,6 +249,17 @@ public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob
         return partitionToScanRanges;
     }
 
+    /**
+     * If the normal parallelization produced an empty list (e.g. all tablets 
have been
+     * pruned by TABLET() hint specifying a non-existent tablet), create a 
single empty
+     * instance on a random worker so the fragment can still execute and 
return an empty
+     * result set rather than failing.
+     *
+     * @param assignedJobs the list produced by {@link 
#insideMachineParallelization}
+     * @param workerManager the worker manager to select a fallback worker from
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return the original list if non-empty, otherwise a single empty 
instance
+     */
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
index bc98119d939..e9dfc735880 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
@@ -48,6 +48,16 @@ public class UnassignedScanSingleRemoteTableJob extends 
AbstractUnassignedScanJo
         this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector, 
"scanWorkerSelector is not null");
     }
 
+    /**
+     * Select a worker for each scan range of the external / remote table scan 
node.
+     * For external tables (Hive, Iceberg, etc.), scan ranges represent file 
splits
+     * rather than tablets, and workers are selected based on data locality or
+     * workload balancing.
+     *
+     * @param distributeContext the distribute context
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return a map from worker to its assigned file scan ranges
+     */
     @Override
     protected Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
@@ -56,6 +66,16 @@ public class UnassignedScanSingleRemoteTableJob extends 
AbstractUnassignedScanJo
         );
     }
 
+    /**
+     * If all file scan ranges have been pruned and the assigned job list is 
empty,
+     * create a single empty instance on a random worker so the fragment can 
still
+     * execute (returning an empty result) rather than failing.
+     *
+     * @param assignedJobs the list produced by {@link 
#insideMachineParallelization}
+     * @param workerManager the worker manager to select a fallback worker from
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return the original list if non-empty, otherwise a single empty 
instance
+     */
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 27792eb288e..40c90563730 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -50,6 +50,22 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
         super(statementContext, fragment, ImmutableList.of(), 
exchangeToChildJob);
     }
 
+    /**
+     * Compute assigned jobs for a shuffle (data redistribution) fragment.
+     * The instance count is determined by the parallelism of the largest child
+     * fragment. When the expected instance count is lower than the child count
+     * (e.g. due to session variable limits or query cache constraints), 
workers
+     * are shuffled to spread instances across different backends for load 
balancing.
+     * When more instances are needed, worker assignment follows the child 
layout.
+     * <p>
+     * If {@code useSerialSource} is true, multiple local shuffle instances are
+     * created per worker to add intra-machine parallelism without rescanning 
data.
+     *
+     * @param distributeContext the distribute context for worker selection
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs,
+     *                  used to determine the largest child fragment's 
instance layout
+     * @return assigned shuffle jobs with workers selected from child fragment 
layout
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
index 6ded32e0cd9..ca3fe1c4aae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
@@ -42,6 +42,17 @@ public class UnassignedSpecifyInstancesJob extends 
AbstractUnassignedJob {
         this.specifyInstances = fragment.specifyInstances.get();
     }
 
+    /**
+     * Compute assigned jobs by delegating to the fragment's
+     * {@link NereidsSpecifyInstances}. This is used when the fragment has
+     * pre-specified instance-to-worker mappings (e.g. from hints or
+     * statement-level instance specifications), bypassing the normal
+     * worker selection and parallelization logic.
+     *
+     * @param distributeContext the distribute context (forwarded to specify 
instances)
+     * @param inputJobs multimap from child exchange nodes to their assigned 
jobs
+     * @return assigned jobs built from the pre-specified instance layout
+     */
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to