This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 082cfe82a0f0f6867b01f04e579232ce294751a3
Author: noorall <863485...@qq.com>
AuthorDate: Wed Nov 6 17:24:56 2024 +0800

    [FLINK-36576][runtime] Introduce two correlations to allow operators to 
flexibly choose the data distribution algorithm
---
 .../org/apache/flink/runtime/jobgraph/JobEdge.java | 39 +++++++++++++++++-
 .../apache/flink/runtime/jobgraph/JobVertex.java   | 35 +++++++++++++++-
 .../flink/streaming/api/graph/StreamEdge.java      | 48 ++++++++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java      | 10 ++++-
 .../IntermediateResultPartitionTest.java           | 10 ++++-
 5 files changed, 137 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 92890f7b581..02e5a8c9082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -65,6 +65,22 @@ public class JobEdge implements java.io.Serializable {
     /** Optional description of the caching inside an operator, to be 
displayed in the JSON plan. */
     private String operatorLevelCachingDescription;
 
+    private final int typeNumber;
+
+    /**
+     * There are relationships between multiple inputs, if the records 
corresponding to the same key
+     * from one input is split, the corresponding key records from the other 
inputs must be
+     * duplicated (meaning that it must be sent to the downstream nodes where 
the split data is
+     * sent).
+     */
+    private final boolean interInputsKeysCorrelated;
+
+    /**
+     * Whether records with the same key are correlated and must be sent to 
the same downstream task
+     * to be processed together.
+     */
+    private final boolean intraInputKeyCorrelated;
+
     /**
      * Constructs a new job edge, that connects an intermediate result to a 
consumer task.
      *
@@ -78,7 +94,10 @@ public class JobEdge implements java.io.Serializable {
             JobVertex target,
             DistributionPattern distributionPattern,
             boolean isBroadcast,
-            boolean isForward) {
+            boolean isForward,
+            int typeNumber,
+            boolean interInputsKeysCorrelated,
+            boolean intraInputKeyCorrelated) {
         if (source == null || target == null || distributionPattern == null) {
             throw new NullPointerException();
         }
@@ -87,6 +106,9 @@ public class JobEdge implements java.io.Serializable {
         this.source = source;
         this.isBroadcast = isBroadcast;
         this.isForward = isForward;
+        this.typeNumber = typeNumber;
+        this.interInputsKeysCorrelated = interInputsKeysCorrelated;
+        this.intraInputKeyCorrelated = intraInputKeyCorrelated;
     }
 
     /**
@@ -233,6 +255,21 @@ public class JobEdge implements java.io.Serializable {
         this.operatorLevelCachingDescription = operatorLevelCachingDescription;
     }
 
+    /** Gets typeNumber of the edge. */
+    public int getTypeNumber() {
+        return typeNumber;
+    }
+
+    /** Gets whether the records with same key of this edge are correlated 
with other inputs. */
+    public boolean areInterInputsKeysCorrelated() {
+        return interInputsKeysCorrelated;
+    }
+
+    /** Gets whether the records with same key of this edge are correlated. */
+    public boolean isIntraInputKeyCorrelated() {
+        return intraInputKeyCorrelated;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 246c3c8ac53..e2e3beb9270 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
@@ -515,6 +516,7 @@ public class JobVertex implements java.io.Serializable {
                 id, key -> new IntermediateDataSet(id, partitionType, this));
     }
 
+    @VisibleForTesting
     public JobEdge connectNewDataSetAsInput(
             JobVertex input,
             DistributionPattern distPattern,
@@ -522,11 +524,42 @@ public class JobVertex implements java.io.Serializable {
             IntermediateDataSetID intermediateDataSetId,
             boolean isBroadcast,
             boolean isForward) {
+        return connectNewDataSetAsInput(
+                input,
+                distPattern,
+                partitionType,
+                intermediateDataSetId,
+                isBroadcast,
+                isForward,
+                -1,
+                distPattern != DistributionPattern.POINTWISE,
+                distPattern != DistributionPattern.POINTWISE && !isBroadcast);
+    }
+
+    public JobEdge connectNewDataSetAsInput(
+            JobVertex input,
+            DistributionPattern distPattern,
+            ResultPartitionType partitionType,
+            IntermediateDataSetID intermediateDataSetId,
+            boolean isBroadcast,
+            boolean isForward,
+            int typeNumber,
+            boolean interInputsKeysCorrelated,
+            boolean intraInputKeyCorrelated) {
 
         IntermediateDataSet dataSet =
                 input.getOrCreateResultDataSet(intermediateDataSetId, 
partitionType);
 
-        JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast, 
isForward);
+        JobEdge edge =
+                new JobEdge(
+                        dataSet,
+                        this,
+                        distPattern,
+                        isBroadcast,
+                        isForward,
+                        typeNumber,
+                        interInputsKeysCorrelated,
+                        intraInputKeyCorrelated);
         this.inputs.add(edge);
         dataSet.addConsumer(edge);
         return edge;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 65472508cb4..3d604a0887d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.OutputTag;
 
@@ -28,6 +30,7 @@ import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An edge in the streaming topology. One edge like this does not necessarily 
gets converted to a
@@ -77,6 +80,20 @@ public class StreamEdge implements Serializable {
 
     private final IntermediateDataSetID intermediateDatasetIdToProduce;
 
+    /**
+     * There are relationships between multiple inputs, if the data 
corresponding to a specific join
+     * key from one input is split, the corresponding join key data from the 
other inputs must be
+     * duplicated (meaning that it must be sent to the downstream nodes where 
the split data is
+     * sent).
+     */
+    private boolean interInputsKeysCorrelated;
+
+    /**
+     * For this edge the data corresponding to a specific join key must be 
sent to the same
+     * downstream subtask.
+     */
+    private boolean intraInputKeyCorrelated;
+
     public StreamEdge(
             StreamNode sourceVertex,
             StreamNode targetVertex,
@@ -150,6 +167,9 @@ public class StreamEdge implements Serializable {
                         + outputPartitioner
                         + "_"
                         + uniqueId;
+        if (outputPartitioner != null) {
+            configureKeyCorrelation(outputPartitioner);
+        }
     }
 
     public int getSourceId() {
@@ -177,6 +197,7 @@ public class StreamEdge implements Serializable {
     }
 
     public void setPartitioner(StreamPartitioner<?> partitioner) {
+        configureKeyCorrelation(partitioner);
         this.outputPartitioner = partitioner;
     }
 
@@ -247,4 +268,31 @@ public class StreamEdge implements Serializable {
     public String getEdgeId() {
         return edgeId;
     }
+
+    private void configureKeyCorrelation(StreamPartitioner<?> partitioner) {
+        // Set a safe value of correlations based on the partitioner to ensure 
the program can
+        // work normally by default. The final value of the correlations can 
be flexibly determined
+        // by the operator.
+        if (partitioner.isPointwise()) {
+            this.intraInputKeyCorrelated = partitioner instanceof 
ForwardPartitioner;
+            this.interInputsKeysCorrelated = false;
+        } else {
+            this.intraInputKeyCorrelated = !(partitioner instanceof 
RebalancePartitioner);
+            this.interInputsKeysCorrelated = !(partitioner instanceof 
RebalancePartitioner);
+        }
+    }
+
+    public boolean areInterInputsKeysCorrelated() {
+        return interInputsKeysCorrelated;
+    }
+
+    public boolean isIntraInputKeyCorrelated() {
+        return intraInputKeyCorrelated;
+    }
+
+    public void setIntraInputKeyCorrelated(boolean intraInputKeyCorrelated) {
+        // We hope to strictly control the behavior of this modification to 
avoid unexpected errors.
+        checkState(interInputsKeysCorrelated, "interInputsKeysCorrelated must 
be true");
+        this.intraInputKeyCorrelated = intraInputKeyCorrelated;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8a807db5013..94707f87c2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1600,7 +1600,10 @@ public class StreamingJobGraphGenerator {
                             resultPartitionType,
                             output.getDataSetId(),
                             partitioner.isBroadcast(),
-                            
partitioner.getClass().equals(ForwardPartitioner.class));
+                            
partitioner.getClass().equals(ForwardPartitioner.class),
+                            edge.getTypeNumber(),
+                            edge.areInterInputsKeysCorrelated(),
+                            edge.isIntraInputKeyCorrelated());
         } else {
             jobEdge =
                     downStreamVertex.connectNewDataSetAsInput(
@@ -1609,7 +1612,10 @@ public class StreamingJobGraphGenerator {
                             resultPartitionType,
                             output.getDataSetId(),
                             partitioner.isBroadcast(),
-                            
partitioner.getClass().equals(ForwardPartitioner.class));
+                            
partitioner.getClass().equals(ForwardPartitioner.class),
+                            edge.getTypeNumber(),
+                            edge.areInterInputsKeysCorrelated(),
+                            edge.isIntraInputKeyCorrelated());
         }
 
         // set strategy name so that web interface can show it.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index 5fb959f2ea5..1e129b7b2e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -221,7 +221,15 @@ public class IntermediateResultPartitionTest {
 
         // add a new job vertex
         dataSet.addConsumer(
-                new JobEdge(dataSet, sink2, DistributionPattern.ALL_TO_ALL, 
false, false));
+                new JobEdge(
+                        dataSet,
+                        sink2,
+                        DistributionPattern.ALL_TO_ALL,
+                        false,
+                        false,
+                        1,
+                        false,
+                        false));
         scheduler
                 .getExecutionGraph()
                 .addNewJobVertices(

Reply via email to