This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 31557ff76c2 branch-4.0: [opt](nereids) support random local shuffle
for union #58042 (#58555)
31557ff76c2 is described below
commit 31557ff76c24e17843009dd1c7e18d7db5caeb21
Author: 924060929 <[email protected]>
AuthorDate: Tue Dec 2 10:02:07 2025 +0800
branch-4.0: [opt](nereids) support random local shuffle for union #58042
(#58555)
picked from #58042
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 23 +++-
be/src/pipeline/exec/exchange_sink_operator.h | 1 +
.../glue/translator/PhysicalPlanTranslator.java | 46 +++++++
.../worker/job/UnassignedJobBuilder.java | 6 +-
.../worker/job/UnassignedLocalShuffleUnionJob.java | 99 +++++++++++++++
.../org/apache/doris/planner/DataPartition.java | 3 +-
.../org/apache/doris/planner/ExchangeNode.java | 13 +-
.../java/org/apache/doris/planner/UnionNode.java | 10 ++
.../nereids/distribute/LocalShuffleUnionTest.java | 141 +++++++++++++++++++++
gensrc/thrift/Partitions.thrift | 3 +
10 files changed, 339 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71a0edf31b7..be65e5f176d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -104,6 +104,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
for (int i = 0; i < channels.size(); ++i) {
if (channels[i]->is_local()) {
local_size++;
+ local_channel_ids.emplace_back(i);
_last_local_channel_idx = i;
}
}
@@ -286,7 +287,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type ==
TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
- sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
+ sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
+ sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE);
#endif
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
@@ -493,6 +495,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
}
local_state.current_channel_idx = (local_state.current_channel_idx +
1) % _writer_count;
+ } else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) {
+ DCHECK_LT(local_state.current_channel_idx,
local_state.local_channel_ids.size())
+ << "local_state.current_channel_idx: " <<
local_state.current_channel_idx
+ << ", local_channel_ids: " <<
to_string(local_state.local_channel_ids);
+
+ // 1. select channel
+ auto& current_channel =
+ local_state
+
.channels[local_state.local_channel_ids[local_state.current_channel_idx]];
+ DCHECK(current_channel->is_local())
+ << "Only local channel are supported, current_channel_idx: "
+ <<
local_state.local_channel_ids[local_state.current_channel_idx];
+ if (!current_channel->is_receiver_eof()) {
+ // 2. serialize, send and rollover block
+ auto status = current_channel->send_local_block(block, eos, true);
+ HANDLE_CHANNEL_STATUS(state, current_channel, status);
+ }
+ local_state.current_channel_idx =
+ (local_state.current_channel_idx + 1) %
local_state.local_channel_ids.size();
} else {
// Range partition
// 1. calculate range
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6900c5491b4..37651f268ea 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -106,6 +106,7 @@ public:
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if
_random == true
bool _only_local_exchange {false};
+ std::vector<uint32_t> local_channel_ids;
void on_channel_finished(InstanceLoId channel_id);
vectorized::PartitionerBase* partitioner() const { return
_partitioner.get(); }
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 731dbc64fc1..c7b79c80c72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -186,6 +186,7 @@ import
org.apache.doris.planner.BackendPartitionedSchemaScanNode;
import org.apache.doris.planner.BlackholeSink;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.EmptySetNode;
@@ -246,6 +247,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -2302,6 +2304,36 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
setOperationNode.setColocate(true);
}
+ // whether accept LocalShuffleUnion.
+ // the backend need `enable_local_exchange=true` to compute whether a
channel is `local`,
+ // and LocalShuffleUnion need `local` channels to do random local
shuffle, so we need check
+ // `enable_local_exchange`
+ if (setOperation instanceof PhysicalUnion
+ &&
context.getConnectContext().getSessionVariable().getEnableLocalExchange()) {
+ boolean isLocalShuffleUnion = false;
+ if (setOperation.getPhysicalProperties().getDistributionSpec()
instanceof DistributionSpecExecutionAny) {
+ Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new
IdentityHashMap<>();
+ for (PlanNode child : setOperationNode.getChildren()) {
+ if (child instanceof ExchangeNode) {
+ exchangeIdToExchangeNode.put(child.getId().asInt(),
(ExchangeNode) child);
+ }
+ }
+
+ for (PlanFragment childFragment :
setOperationFragment.getChildren()) {
+ DataSink sink = childFragment.getSink();
+ if (sink instanceof DataStreamSink) {
+ isLocalShuffleUnion |=
setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink);
+ } else if (sink instanceof MultiCastDataSink) {
+ MultiCastDataSink multiCastDataSink =
(MultiCastDataSink) sink;
+ for (DataStreamSink dataStreamSink :
multiCastDataSink.getDataStreamSinks()) {
+ isLocalShuffleUnion |=
setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink);
+ }
+ }
+ }
+ }
+ ((UnionNode)
setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion);
+ }
+
return setOperationFragment;
}
@@ -3271,4 +3303,18 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
return child instanceof PhysicalRelation;
}
+
+ private boolean setLocalRandomPartition(
+ Map<Integer, ExchangeNode> exchangeIdToExchangeNode,
DataStreamSink dataStreamSink) {
+ ExchangeNode exchangeNode = exchangeIdToExchangeNode.get(
+ dataStreamSink.getExchNodeId().asInt());
+ if (exchangeNode == null) {
+ return false;
+ }
+ exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE);
+
+ DataPartition p2pPartition = new
DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE);
+ dataStreamSink.setOutputPartition(p2pPartition);
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index bc20d3efa17..ce75dd3dbfa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
+import org.apache.doris.planner.UnionNode;
import org.apache.doris.thrift.TExplainLevel;
import com.google.common.collect.ArrayListMultimap;
@@ -215,7 +216,10 @@ public class UnassignedJobBuilder {
private UnassignedJob buildShuffleJob(
StatementContext statementContext, PlanFragment planFragment,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
- if (planFragment.isPartitioned()) {
+ if
(planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance)
+
.stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) {
+ return new UnassignedLocalShuffleUnionJob(statementContext,
planFragment, inputJobs);
+ } else if (planFragment.isPartitioned()) {
return new UnassignedShuffleJob(statementContext, planFragment,
inputJobs);
} else {
return new UnassignedGatherJob(statementContext, planFragment,
inputJobs);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
new file mode 100644
index 00000000000..5c212507275
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * this class is used to local shuffle between the same backend, to save
network io.
+ *
+ * for example: we have A/B/C three backend, and every backend process 3
instances before the Union,
+ * then the Union will generate same instances for the source
backend, and every source
+ * instances will random local shuffle to the self backend's
three target instances, like this:
+ *
+ * UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say
there has 3 backends: A/B/C
+ * |
+ * +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local
shuffle to A4/A5/A6,
+ * | B1 random local
shuffle to B4/B5/B6,
+ * | C1 random local
shuffle to C4/C5/C6
+ * |
+ * +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local
shuffle to A4/A5/A6,
+ * | B2 random local
shuffle to B4/B5/B6,
+ * | C2 random local
shuffle to C4/C5/C6
+ * |
+ * +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local
shuffle to A4/A5/A6,
+ * B3 random local
shuffle to B4/B5/B6,
+ * C3 random local
shuffle to C4/C5/C6
+ */
+public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob {
+
+ public UnassignedLocalShuffleUnionJob(StatementContext statementContext,
PlanFragment fragment,
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
+ super(statementContext, fragment, ImmutableList.of(),
exchangeToChildJob);
+ }
+
+ @Override
+ public List<AssignedJob> computeAssignedJobs(
+ DistributeContext context, ListMultimap<ExchangeNode, AssignedJob>
inputJobs) {
+ ConnectContext connectContext = statementContext.getConnectContext();
+ DefaultScanSource noScanSource = DefaultScanSource.empty();
+ List<AssignedJob> unionInstances =
Lists.newArrayListWithCapacity(inputJobs.size());
+
+ List<UnionNode> unionNodes =
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
+ Set<Integer> exchangeIdToUnion = Sets.newLinkedHashSet();
+ for (UnionNode unionNode : unionNodes) {
+ for (PlanNode child : unionNode.getChildren()) {
+ if (child instanceof ExchangeNode) {
+ exchangeIdToUnion.add(child.getId().asInt());
+ }
+ }
+ }
+
+ int id = 0;
+ for (Entry<ExchangeNode, Collection<AssignedJob>>
exchangeNodeToSources : inputJobs.asMap().entrySet()) {
+ ExchangeNode exchangeNode = exchangeNodeToSources.getKey();
+ if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) {
+ continue;
+ }
+ for (AssignedJob inputInstance : exchangeNodeToSources.getValue())
{
+ StaticAssignedJob unionInstance = new StaticAssignedJob(
+ id++, connectContext.nextInstanceId(), this,
+ inputInstance.getAssignedWorker(), noScanSource
+ );
+ unionInstances.add(unionInstance);
+ }
+ }
+ return unionInstances;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 648fbace47c..b78bd43677b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -65,7 +65,8 @@ public class DataPartition {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
- || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
+ || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED
+ || type == TPartitionType.RANDOM_LOCAL_SHUFFLE);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 69883fe7988..09f02890879 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -68,7 +68,7 @@ public class ExchangeNode extends PlanNode {
offset = 0;
limit = -1;
this.conjuncts = Collections.emptyList();
- children.add(inputNode);
+ this.children.add(inputNode);
computeTupleIds();
}
@@ -175,8 +175,15 @@ public class ExchangeNode extends PlanNode {
*/
@Override
public boolean isSerialOperator() {
- return (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isUseSerialExchange()
- || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo
== null;
+ return (
+ (
+ ConnectContext.get() != null
+ &&
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+ && (partitionType !=
TPartitionType.RANDOM_LOCAL_SHUFFLE)
+ )
+ || partitionType == TPartitionType.UNPARTITIONED
+ )
+ && mergeInfo == null;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 09d366f5456..15cc0ae0d74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -26,6 +26,8 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
public class UnionNode extends SetOperationNode {
+ private boolean localShuffleUnion;
+
public UnionNode(PlanNodeId id, TupleId tupleId) {
super(id, tupleId, "UNION", StatisticalType.UNION_NODE);
}
@@ -41,4 +43,12 @@ public class UnionNode extends SetOperationNode {
public boolean isSerialOperator() {
return children.isEmpty();
}
+
+ public boolean isLocalShuffleUnion() {
+ return localShuffleUnion;
+ }
+
+ public void setLocalShuffleUnion(boolean localShuffleUnion) {
+ this.localShuffleUnion = localShuffleUnion;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
new file mode 100644
index 00000000000..66f5e55c539
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.distribute;
+
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedLocalShuffleUnionJob;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.DataStreamSink;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.MultiCastDataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalShuffleUnionTest extends TestWithFeService {
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ connectContext.setDatabase("test");
+ createTable("create table test.tbl(id int)
properties('replication_num' = '1')");
+ }
+
+ @Test
+ public void testLocalShuffleUnion() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ StmtExecutor stmtExecutor = executeNereidsSql(
+ "explain distributed plan select * from test.tbl union all
select * from test.tbl");
+ List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+ assertHasLocalShuffleUnion(fragments);
+ }
+
+ @Test
+ public void testLocalShuffleUnionWithCte() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ StmtExecutor stmtExecutor = executeNereidsSql(
+ "explain distributed plan with a as (select * from test.tbl)
select * from a union all select * from a");
+ List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+ assertHasLocalShuffleUnion(fragments);
+ }
+
+ @Test
+ public void testLocalShuffleUnionWithJoin() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ StmtExecutor stmtExecutor = executeNereidsSql(
+ "explain distributed plan select * from (select * from
test.tbl union all select * from test.tbl)a left join[broadcast] (select * from
test.tbl)b on a.id=b.id");
+ List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
+ assertHasLocalShuffleUnion(fragments);
+
+ FragmentIdMapping<DistributedPlan> distributedPlans
+ = ((NereidsPlanner)
stmtExecutor.planner()).getDistributedPlans();
+ for (DistributedPlan plan : distributedPlans.values()) {
+ PipelineDistributedPlan pipelineDistributedPlan =
(PipelineDistributedPlan) plan;
+ if (pipelineDistributedPlan.getFragmentJob() instanceof
UnassignedLocalShuffleUnionJob) {
+ List<AssignedJob> sourcesInstances =
pipelineDistributedPlan.getInputs()
+ .values()
+ .stream()
+ .flatMap(source -> ((PipelineDistributedPlan)
source).getInstanceJobs().stream())
+ .collect(Collectors.toList());
+
+ List<AssignedJob> broadSourceInstances =
pipelineDistributedPlan.getInputs()
+ .entries()
+ .stream()
+ .filter(kv -> kv.getKey().getPartitionType() !=
TPartitionType.RANDOM_LOCAL_SHUFFLE)
+ .flatMap(kv -> ((PipelineDistributedPlan)
kv.getValue()).getInstanceJobs().stream())
+ .collect(Collectors.toList());
+
+ Assertions.assertTrue(
+ pipelineDistributedPlan.getInstanceJobs().size() <
sourcesInstances.size()
+ );
+
+ Assertions.assertEquals(
+ (sourcesInstances.size() -
broadSourceInstances.size()),
+ pipelineDistributedPlan.getInstanceJobs().size()
+ );
+ }
+ }
+ }
+
+ private void assertHasLocalShuffleUnion(List<PlanFragment> fragments) {
+ boolean hasLocalShuffleUnion = false;
+ for (PlanFragment fragment : fragments) {
+ List<PlanNode> unions =
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
+ for (PlanNode planNode : unions) {
+ UnionNode union = (UnionNode) planNode;
+ assertUnionIsInplace(union, fragment);
+ hasLocalShuffleUnion = true;
+ }
+ }
+ Assertions.assertTrue(hasLocalShuffleUnion);
+ }
+
+ private void assertUnionIsInplace(UnionNode unionNode, PlanFragment
unionFragment) {
+ Assertions.assertTrue(unionNode.isLocalShuffleUnion());
+ for (PlanNode child : unionNode.getChildren()) {
+ if (child instanceof ExchangeNode) {
+ ExchangeNode exchangeNode = (ExchangeNode) child;
+ Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
exchangeNode.getPartitionType());
+ for (PlanFragment childFragment : unionFragment.getChildren())
{
+ DataSink sink = childFragment.getSink();
+ if (sink instanceof DataStreamSink &&
sink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) {
+
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
sink.getOutputPartition().getType());
+ } else if (sink instanceof MultiCastDataSink) {
+ for (DataStreamSink dataStreamSink :
((MultiCastDataSink) sink).getDataStreamSinks()) {
+ if (dataStreamSink.getExchNodeId().asInt() ==
exchangeNode.getId().asInt()) {
+
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
dataStreamSink.getOutputPartition().getType());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 86a2d9be555..69100109a1b 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -50,6 +50,9 @@ enum TPartitionType {
// used for hive unparititoned table
HIVE_TABLE_SINK_UNPARTITIONED = 8
+
+ // used for random local shuffle union, one source instance random send data
to target instances in the same backend
+ RANDOM_LOCAL_SHUFFLE = 9
}
enum TDistributionType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]