This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 31557ff76c2 branch-4.0: [opt](nereids) support random local shuffle 
for union #58042 (#58555)
31557ff76c2 is described below

commit 31557ff76c24e17843009dd1c7e18d7db5caeb21
Author: 924060929 <[email protected]>
AuthorDate: Tue Dec 2 10:02:07 2025 +0800

    branch-4.0: [opt](nereids) support random local shuffle for union #58042 
(#58555)
    
    picked from #58042
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  23 +++-
 be/src/pipeline/exec/exchange_sink_operator.h      |   1 +
 .../glue/translator/PhysicalPlanTranslator.java    |  46 +++++++
 .../worker/job/UnassignedJobBuilder.java           |   6 +-
 .../worker/job/UnassignedLocalShuffleUnionJob.java |  99 +++++++++++++++
 .../org/apache/doris/planner/DataPartition.java    |   3 +-
 .../org/apache/doris/planner/ExchangeNode.java     |  13 +-
 .../java/org/apache/doris/planner/UnionNode.java   |  10 ++
 .../nereids/distribute/LocalShuffleUnionTest.java  | 141 +++++++++++++++++++++
 gensrc/thrift/Partitions.thrift                    |   3 +
 10 files changed, 339 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71a0edf31b7..be65e5f176d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -104,6 +104,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     for (int i = 0; i < channels.size(); ++i) {
         if (channels[i]->is_local()) {
             local_size++;
+            local_channel_ids.emplace_back(i);
             _last_local_channel_idx = i;
         }
     }
@@ -286,7 +287,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
            sink.output_partition.type == 
TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
-           sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
+           sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
+           sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE);
 #endif
     _name = "ExchangeSinkOperatorX";
     _pool = std::make_shared<ObjectPool>();
@@ -493,6 +495,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             }
         }
         local_state.current_channel_idx = (local_state.current_channel_idx + 
1) % _writer_count;
+    } else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) {
+        DCHECK_LT(local_state.current_channel_idx, 
local_state.local_channel_ids.size())
+                << "local_state.current_channel_idx: " << 
local_state.current_channel_idx
+                << ", local_channel_ids: " << 
to_string(local_state.local_channel_ids);
+
+        // 1. select channel
+        auto& current_channel =
+                local_state
+                        
.channels[local_state.local_channel_ids[local_state.current_channel_idx]];
+        DCHECK(current_channel->is_local())
+                << "Only local channel are supported, current_channel_idx: "
+                << 
local_state.local_channel_ids[local_state.current_channel_idx];
+        if (!current_channel->is_receiver_eof()) {
+            // 2. serialize, send and rollover block
+            auto status = current_channel->send_local_block(block, eos, true);
+            HANDLE_CHANNEL_STATUS(state, current_channel, status);
+        }
+        local_state.current_channel_idx =
+                (local_state.current_channel_idx + 1) % 
local_state.local_channel_ids.size();
     } else {
         // Range partition
         // 1. calculate range
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6900c5491b4..37651f268ea 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -106,6 +106,7 @@ public:
     std::vector<std::shared_ptr<vectorized::Channel>> channels;
     int current_channel_idx {0}; // index of current channel to send to if 
_random == true
     bool _only_local_exchange {false};
+    std::vector<uint32_t> local_channel_ids;
 
     void on_channel_finished(InstanceLoId channel_id);
     vectorized::PartitionerBase* partitioner() const { return 
_partitioner.get(); }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 731dbc64fc1..c7b79c80c72 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -186,6 +186,7 @@ import 
org.apache.doris.planner.BackendPartitionedSchemaScanNode;
 import org.apache.doris.planner.BlackholeSink;
 import org.apache.doris.planner.CTEScanNode;
 import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.DictionarySink;
 import org.apache.doris.planner.EmptySetNode;
@@ -246,6 +247,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -2302,6 +2304,36 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setOperationNode.setColocate(true);
         }
 
+        // whether accept LocalShuffleUnion.
+        // the backend need `enable_local_exchange=true` to compute whether a 
channel is `local`,
+        // and LocalShuffleUnion need `local` channels to do random local 
shuffle, so we need check
+        // `enable_local_exchange`
+        if (setOperation instanceof PhysicalUnion
+                && 
context.getConnectContext().getSessionVariable().getEnableLocalExchange()) {
+            boolean isLocalShuffleUnion = false;
+            if (setOperation.getPhysicalProperties().getDistributionSpec() 
instanceof DistributionSpecExecutionAny) {
+                Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new 
IdentityHashMap<>();
+                for (PlanNode child : setOperationNode.getChildren()) {
+                    if (child instanceof ExchangeNode) {
+                        exchangeIdToExchangeNode.put(child.getId().asInt(), 
(ExchangeNode) child);
+                    }
+                }
+
+                for (PlanFragment childFragment : 
setOperationFragment.getChildren()) {
+                    DataSink sink = childFragment.getSink();
+                    if (sink instanceof DataStreamSink) {
+                        isLocalShuffleUnion |= 
setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink);
+                    } else if (sink instanceof MultiCastDataSink) {
+                        MultiCastDataSink multiCastDataSink = 
(MultiCastDataSink) sink;
+                        for (DataStreamSink dataStreamSink : 
multiCastDataSink.getDataStreamSinks()) {
+                            isLocalShuffleUnion |= 
setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink);
+                        }
+                    }
+                }
+            }
+            ((UnionNode) 
setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion);
+        }
+
         return setOperationFragment;
     }
 
@@ -3271,4 +3303,18 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         return child instanceof PhysicalRelation;
     }
+
+    private boolean setLocalRandomPartition(
+            Map<Integer, ExchangeNode> exchangeIdToExchangeNode, 
DataStreamSink dataStreamSink) {
+        ExchangeNode exchangeNode = exchangeIdToExchangeNode.get(
+                dataStreamSink.getExchNodeId().asInt());
+        if (exchangeNode == null) {
+            return false;
+        }
+        exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE);
+
+        DataPartition p2pPartition = new 
DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE);
+        dataStreamSink.setOutputPartition(p2pPartition);
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index bc20d3efa17..ce75dd3dbfa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
+import org.apache.doris.planner.UnionNode;
 import org.apache.doris.thrift.TExplainLevel;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -215,7 +216,10 @@ public class UnassignedJobBuilder {
     private UnassignedJob buildShuffleJob(
             StatementContext statementContext, PlanFragment planFragment,
             ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
-        if (planFragment.isPartitioned()) {
+        if 
(planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance)
+                
.stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) {
+            return new UnassignedLocalShuffleUnionJob(statementContext, 
planFragment, inputJobs);
+        } else if (planFragment.isPartitioned()) {
             return new UnassignedShuffleJob(statementContext, planFragment, 
inputJobs);
         } else {
             return new UnassignedGatherJob(statementContext, planFragment, 
inputJobs);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
new file mode 100644
index 00000000000..5c212507275
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * this class is used to local shuffle between the same backend, to save 
network io.
+ *
+ * for example: we have A/B/C three backend, and every backend process 3 
instances before the Union,
+ *              then the Union will generate same instances for the source 
backend, and every source
+ *              instances will random local shuffle to the self backend's 
three target instances, like this:
+ *
+ * UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say 
there has 3 backends: A/B/C
+ * |
+ * +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local 
shuffle to A4/A5/A6,
+ * |                                                    B1 random local 
shuffle to B4/B5/B6,
+ * |                                                    C1 random local 
shuffle to C4/C5/C6
+ * |
+ * +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local 
shuffle to A4/A5/A6,
+ * |                                                    B2 random local 
shuffle to B4/B5/B6,
+ * |                                                    C2 random local 
shuffle to C4/C5/C6
+ * |
+ * +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local 
shuffle to A4/A5/A6,
+ *                                                      B3 random local 
shuffle to B4/B5/B6,
+ *                                                      C3 random local 
shuffle to C4/C5/C6
+ */
+public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob {
+
+    public UnassignedLocalShuffleUnionJob(StatementContext statementContext, 
PlanFragment fragment,
+            ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
+        super(statementContext, fragment, ImmutableList.of(), 
exchangeToChildJob);
+    }
+
+    @Override
+    public List<AssignedJob> computeAssignedJobs(
+            DistributeContext context, ListMultimap<ExchangeNode, AssignedJob> 
inputJobs) {
+        ConnectContext connectContext = statementContext.getConnectContext();
+        DefaultScanSource noScanSource = DefaultScanSource.empty();
+        List<AssignedJob> unionInstances = 
Lists.newArrayListWithCapacity(inputJobs.size());
+
+        List<UnionNode> unionNodes = 
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
+        Set<Integer> exchangeIdToUnion = Sets.newLinkedHashSet();
+        for (UnionNode unionNode : unionNodes) {
+            for (PlanNode child : unionNode.getChildren()) {
+                if (child instanceof ExchangeNode) {
+                    exchangeIdToUnion.add(child.getId().asInt());
+                }
+            }
+        }
+
+        int id = 0;
+        for (Entry<ExchangeNode, Collection<AssignedJob>> 
exchangeNodeToSources : inputJobs.asMap().entrySet()) {
+            ExchangeNode exchangeNode = exchangeNodeToSources.getKey();
+            if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) {
+                continue;
+            }
+            for (AssignedJob inputInstance : exchangeNodeToSources.getValue()) 
{
+                StaticAssignedJob unionInstance = new StaticAssignedJob(
+                        id++, connectContext.nextInstanceId(), this,
+                        inputInstance.getAssignedWorker(), noScanSource
+                );
+                unionInstances.add(unionInstance);
+            }
+        }
+        return unionInstances;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 648fbace47c..b78bd43677b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -65,7 +65,8 @@ public class DataPartition {
         Preconditions.checkState(type == TPartitionType.UNPARTITIONED
                 || type == TPartitionType.RANDOM
                 || type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
-                || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
+                || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED
+                || type == TPartitionType.RANDOM_LOCAL_SHUFFLE);
         this.type = type;
         this.partitionExprs = ImmutableList.of();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 69883fe7988..09f02890879 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -68,7 +68,7 @@ public class ExchangeNode extends PlanNode {
         offset = 0;
         limit = -1;
         this.conjuncts = Collections.emptyList();
-        children.add(inputNode);
+        this.children.add(inputNode);
         computeTupleIds();
     }
 
@@ -175,8 +175,15 @@ public class ExchangeNode extends PlanNode {
      */
     @Override
     public boolean isSerialOperator() {
-        return (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isUseSerialExchange()
-                || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo 
== null;
+        return (
+                (
+                    ConnectContext.get() != null
+                        && 
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+                        && (partitionType != 
TPartitionType.RANDOM_LOCAL_SHUFFLE)
+                )
+                || partitionType == TPartitionType.UNPARTITIONED
+            )
+            && mergeInfo == null;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 09d366f5456..15cc0ae0d74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -26,6 +26,8 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
 public class UnionNode extends SetOperationNode {
+    private boolean localShuffleUnion;
+
     public UnionNode(PlanNodeId id, TupleId tupleId) {
         super(id, tupleId, "UNION", StatisticalType.UNION_NODE);
     }
@@ -41,4 +43,12 @@ public class UnionNode extends SetOperationNode {
     public boolean isSerialOperator() {
         return children.isEmpty();
     }
+
+    public boolean isLocalShuffleUnion() {
+        return localShuffleUnion;
+    }
+
+    public void setLocalShuffleUnion(boolean localShuffleUnion) {
+        this.localShuffleUnion = localShuffleUnion;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
new file mode 100644
index 00000000000..66f5e55c539
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.distribute;
+
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedLocalShuffleUnionJob;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.DataStreamSink;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.MultiCastDataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalShuffleUnionTest extends TestWithFeService {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        connectContext.setDatabase("test");
+        createTable("create table test.tbl(id int) 
properties('replication_num' = '1')");
+    }
+
+    @Test
+    public void testLocalShuffleUnion() throws Exception {
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+        StmtExecutor stmtExecutor = executeNereidsSql(
+                "explain distributed plan select * from test.tbl union all 
select * from test.tbl");
+        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+        assertHasLocalShuffleUnion(fragments);
+    }
+
+    @Test
+    public void testLocalShuffleUnionWithCte() throws Exception {
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+        StmtExecutor stmtExecutor = executeNereidsSql(
+                "explain distributed plan with a as (select * from test.tbl) 
select * from a union all select * from a");
+        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+        assertHasLocalShuffleUnion(fragments);
+    }
+
+    @Test
+    public void testLocalShuffleUnionWithJoin() throws Exception {
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+        StmtExecutor stmtExecutor = executeNereidsSql(
+                "explain distributed plan select * from (select * from 
test.tbl union all select * from test.tbl)a left join[broadcast] (select * from 
test.tbl)b on a.id=b.id");
+        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+        assertHasLocalShuffleUnion(fragments);
+
+        FragmentIdMapping<DistributedPlan> distributedPlans
+                = ((NereidsPlanner) 
stmtExecutor.planner()).getDistributedPlans();
+        for (DistributedPlan plan : distributedPlans.values()) {
+            PipelineDistributedPlan pipelineDistributedPlan = 
(PipelineDistributedPlan) plan;
+            if (pipelineDistributedPlan.getFragmentJob() instanceof 
UnassignedLocalShuffleUnionJob) {
+                List<AssignedJob> sourcesInstances = 
pipelineDistributedPlan.getInputs()
+                        .values()
+                        .stream()
+                        .flatMap(source -> ((PipelineDistributedPlan) 
source).getInstanceJobs().stream())
+                        .collect(Collectors.toList());
+
+                List<AssignedJob> broadSourceInstances = 
pipelineDistributedPlan.getInputs()
+                        .entries()
+                        .stream()
+                        .filter(kv -> kv.getKey().getPartitionType() != 
TPartitionType.RANDOM_LOCAL_SHUFFLE)
+                        .flatMap(kv -> ((PipelineDistributedPlan) 
kv.getValue()).getInstanceJobs().stream())
+                        .collect(Collectors.toList());
+
+                Assertions.assertTrue(
+                        pipelineDistributedPlan.getInstanceJobs().size() < 
sourcesInstances.size()
+                );
+
+                Assertions.assertEquals(
+                        (sourcesInstances.size() - 
broadSourceInstances.size()),
+                        pipelineDistributedPlan.getInstanceJobs().size()
+                );
+            }
+        }
+    }
+
+    private void assertHasLocalShuffleUnion(List<PlanFragment> fragments) {
+        boolean hasLocalShuffleUnion = false;
+        for (PlanFragment fragment : fragments) {
+            List<PlanNode> unions = 
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
+            for (PlanNode planNode : unions) {
+                UnionNode union = (UnionNode) planNode;
+                assertUnionIsInplace(union, fragment);
+                hasLocalShuffleUnion = true;
+            }
+        }
+        Assertions.assertTrue(hasLocalShuffleUnion);
+    }
+
+    private void assertUnionIsInplace(UnionNode unionNode, PlanFragment 
unionFragment) {
+        Assertions.assertTrue(unionNode.isLocalShuffleUnion());
+        for (PlanNode child : unionNode.getChildren()) {
+            if (child instanceof ExchangeNode) {
+                ExchangeNode exchangeNode = (ExchangeNode) child;
+                Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
exchangeNode.getPartitionType());
+                for (PlanFragment childFragment : unionFragment.getChildren()) 
{
+                    DataSink sink = childFragment.getSink();
+                    if (sink instanceof DataStreamSink && 
sink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) {
+                        
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
sink.getOutputPartition().getType());
+                    } else if (sink instanceof MultiCastDataSink) {
+                        for (DataStreamSink dataStreamSink : 
((MultiCastDataSink) sink).getDataStreamSinks()) {
+                            if (dataStreamSink.getExchNodeId().asInt() == 
exchangeNode.getId().asInt()) {
+                                
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
dataStreamSink.getOutputPartition().getType());
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 86a2d9be555..69100109a1b 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -50,6 +50,9 @@ enum TPartitionType {
 
   // used for hive unparititoned table
   HIVE_TABLE_SINK_UNPARTITIONED = 8
+
+  // used for random local shuffle union, one source instance random send data 
to target instances in the same backend
+  RANDOM_LOCAL_SHUFFLE = 9
 }
 
 enum TDistributionType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to