This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 082cfe82a0f0f6867b01f04e579232ce294751a3 Author: noorall <863485...@qq.com> AuthorDate: Wed Nov 6 17:24:56 2024 +0800 [FLINK-36576][runtime] Introduce two correlations to allow operators to flexibly choose the data distribution algorithm --- .../org/apache/flink/runtime/jobgraph/JobEdge.java | 39 +++++++++++++++++- .../apache/flink/runtime/jobgraph/JobVertex.java | 35 +++++++++++++++- .../flink/streaming/api/graph/StreamEdge.java | 48 ++++++++++++++++++++++ .../api/graph/StreamingJobGraphGenerator.java | 10 ++++- .../IntermediateResultPartitionTest.java | 10 ++++- 5 files changed, 137 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java index 92890f7b581..02e5a8c9082 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java @@ -65,6 +65,22 @@ public class JobEdge implements java.io.Serializable { /** Optional description of the caching inside an operator, to be displayed in the JSON plan. */ private String operatorLevelCachingDescription; + private final int typeNumber; + + /** + * There are relationships between multiple inputs, if the records corresponding to the same key + * from one input is split, the corresponding key records from the other inputs must be + * duplicated (meaning that it must be sent to the downstream nodes where the split data is + * sent). + */ + private final boolean interInputsKeysCorrelated; + + /** + * Whether records with the same key are correlated and must be sent to the same downstream task + * to be processed together. + */ + private final boolean intraInputKeyCorrelated; + /** * Constructs a new job edge, that connects an intermediate result to a consumer task. * @@ -78,7 +94,10 @@ public class JobEdge implements java.io.Serializable { JobVertex target, DistributionPattern distributionPattern, boolean isBroadcast, - boolean isForward) { + boolean isForward, + int typeNumber, + boolean interInputsKeysCorrelated, + boolean intraInputKeyCorrelated) { if (source == null || target == null || distributionPattern == null) { throw new NullPointerException(); } @@ -87,6 +106,9 @@ public class JobEdge implements java.io.Serializable { this.source = source; this.isBroadcast = isBroadcast; this.isForward = isForward; + this.typeNumber = typeNumber; + this.interInputsKeysCorrelated = interInputsKeysCorrelated; + this.intraInputKeyCorrelated = intraInputKeyCorrelated; } /** @@ -233,6 +255,21 @@ public class JobEdge implements java.io.Serializable { this.operatorLevelCachingDescription = operatorLevelCachingDescription; } + /** Gets typeNumber of the edge. */ + public int getTypeNumber() { + return typeNumber; + } + + /** Gets whether the records with same key of this edge are correlated with other inputs. */ + public boolean areInterInputsKeysCorrelated() { + return interInputsKeysCorrelated; + } + + /** Gets whether the records with same key of this edge are correlated. */ + public boolean isIntraInputKeyCorrelated() { + return intraInputKeyCorrelated; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 246c3c8ac53..e2e3beb9270 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobgraph; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.configuration.Configuration; @@ -515,6 +516,7 @@ public class JobVertex implements java.io.Serializable { id, key -> new IntermediateDataSet(id, partitionType, this)); } + @VisibleForTesting public JobEdge connectNewDataSetAsInput( JobVertex input, DistributionPattern distPattern, @@ -522,11 +524,42 @@ public class JobVertex implements java.io.Serializable { IntermediateDataSetID intermediateDataSetId, boolean isBroadcast, boolean isForward) { + return connectNewDataSetAsInput( + input, + distPattern, + partitionType, + intermediateDataSetId, + isBroadcast, + isForward, + -1, + distPattern != DistributionPattern.POINTWISE, + distPattern != DistributionPattern.POINTWISE && !isBroadcast); + } + + public JobEdge connectNewDataSetAsInput( + JobVertex input, + DistributionPattern distPattern, + ResultPartitionType partitionType, + IntermediateDataSetID intermediateDataSetId, + boolean isBroadcast, + boolean isForward, + int typeNumber, + boolean interInputsKeysCorrelated, + boolean intraInputKeyCorrelated) { IntermediateDataSet dataSet = input.getOrCreateResultDataSet(intermediateDataSetId, partitionType); - JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast, isForward); + JobEdge edge = + new JobEdge( + dataSet, + this, + distPattern, + isBroadcast, + isForward, + typeNumber, + interInputsKeysCorrelated, + intraInputKeyCorrelated); this.inputs.add(edge); dataSet.addConsumer(edge); return edge; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 65472508cb4..3d604a0887d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.streaming.api.transformations.StreamExchangeMode; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; @@ -28,6 +30,7 @@ import java.util.Objects; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * An edge in the streaming topology. One edge like this does not necessarily gets converted to a @@ -77,6 +80,20 @@ public class StreamEdge implements Serializable { private final IntermediateDataSetID intermediateDatasetIdToProduce; + /** + * There are relationships between multiple inputs, if the data corresponding to a specific join + * key from one input is split, the corresponding join key data from the other inputs must be + * duplicated (meaning that it must be sent to the downstream nodes where the split data is + * sent). + */ + private boolean interInputsKeysCorrelated; + + /** + * For this edge the data corresponding to a specific join key must be sent to the same + * downstream subtask. + */ + private boolean intraInputKeyCorrelated; + public StreamEdge( StreamNode sourceVertex, StreamNode targetVertex, @@ -150,6 +167,9 @@ public class StreamEdge implements Serializable { + outputPartitioner + "_" + uniqueId; + if (outputPartitioner != null) { + configureKeyCorrelation(outputPartitioner); + } } public int getSourceId() { @@ -177,6 +197,7 @@ public class StreamEdge implements Serializable { } public void setPartitioner(StreamPartitioner<?> partitioner) { + configureKeyCorrelation(partitioner); this.outputPartitioner = partitioner; } @@ -247,4 +268,31 @@ public class StreamEdge implements Serializable { public String getEdgeId() { return edgeId; } + + private void configureKeyCorrelation(StreamPartitioner<?> partitioner) { + // Set a safe value of correlations based on the partitioner to ensure the program can + // work normally by default. The final value of the correlations can be flexibly determined + // by the operator. + if (partitioner.isPointwise()) { + this.intraInputKeyCorrelated = partitioner instanceof ForwardPartitioner; + this.interInputsKeysCorrelated = false; + } else { + this.intraInputKeyCorrelated = !(partitioner instanceof RebalancePartitioner); + this.interInputsKeysCorrelated = !(partitioner instanceof RebalancePartitioner); + } + } + + public boolean areInterInputsKeysCorrelated() { + return interInputsKeysCorrelated; + } + + public boolean isIntraInputKeyCorrelated() { + return intraInputKeyCorrelated; + } + + public void setIntraInputKeyCorrelated(boolean intraInputKeyCorrelated) { + // We hope to strictly control the behavior of this modification to avoid unexpected errors. + checkState(interInputsKeysCorrelated, "interInputsKeysCorrelated must be true"); + this.intraInputKeyCorrelated = intraInputKeyCorrelated; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 8a807db5013..94707f87c2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1600,7 +1600,10 @@ public class StreamingJobGraphGenerator { resultPartitionType, output.getDataSetId(), partitioner.isBroadcast(), - partitioner.getClass().equals(ForwardPartitioner.class)); + partitioner.getClass().equals(ForwardPartitioner.class), + edge.getTypeNumber(), + edge.areInterInputsKeysCorrelated(), + edge.isIntraInputKeyCorrelated()); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( @@ -1609,7 +1612,10 @@ public class StreamingJobGraphGenerator { resultPartitionType, output.getDataSetId(), partitioner.isBroadcast(), - partitioner.getClass().equals(ForwardPartitioner.class)); + partitioner.getClass().equals(ForwardPartitioner.class), + edge.getTypeNumber(), + edge.areInterInputsKeysCorrelated(), + edge.isIntraInputKeyCorrelated()); } // set strategy name so that web interface can show it. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index 5fb959f2ea5..1e129b7b2e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -221,7 +221,15 @@ public class IntermediateResultPartitionTest { // add a new job vertex dataSet.addConsumer( - new JobEdge(dataSet, sink2, DistributionPattern.ALL_TO_ALL, false, false)); + new JobEdge( + dataSet, + sink2, + DistributionPattern.ALL_TO_ALL, + false, + false, + 1, + false, + false)); scheduler .getExecutionGraph() .addNewJobVertices(