gaoyunhaii commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r892215954


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -18,16 +18,25 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Comparator;
+import java.util.Map;
 import java.util.OptionalInt;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 /** Container for meta-data of a data set. */
 public final class DataSetMetaInfo {
     private static final int UNKNOWN = -1;
 
     private final int numRegisteredPartitions;
     private final int numTotalPartitions;
+    private final SortedMap<ResultPartitionID, ShuffleDescriptor>
+            shuffleDescriptorsOrderByPartitionNumber =

Review Comment:
   nit: `shuffleDescriptorsOrderByPartitionId` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +486,18 @@ public IntermediateDataSet createAndAddResultDataSet(
 
     public JobEdge connectNewDataSetAsInput(
             JobVertex input, DistributionPattern distPattern, 
ResultPartitionType partitionType) {
+        return this.connectNewDataSetAsInput(
+                input, distPattern, partitionType, new 
IntermediateDataSetID());
+    }
+
+    public JobEdge connectNewDataSetAsInput(
+            JobVertex input,
+            DistributionPattern distPattern,
+            ResultPartitionType partitionType,
+            IntermediateDataSetID intermediateDataSetId) {
 
-        IntermediateDataSet dataSet = 
input.createAndAddResultDataSet(partitionType);
+        IntermediateDataSet dataSet =
+                input.createAndAddResultDataSet(intermediateDataSetId, 
partitionType);

Review Comment:
   The originally called method `createAndAddResultDataSet(ResultPartitionType 
partitionType)` seems not used now.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +486,18 @@ public IntermediateDataSet createAndAddResultDataSet(
 
     public JobEdge connectNewDataSetAsInput(
             JobVertex input, DistributionPattern distPattern, 
ResultPartitionType partitionType) {
+        return this.connectNewDataSetAsInput(

Review Comment:
   Not use `this.` except for the constructors



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -44,6 +53,15 @@ public int getNumTotalPartitions() {
         return numTotalPartitions;
     }
 
+    public void addShuffleDescriptors(

Review Comment:
   nit: might also returns `DataSetMetaInfo`, thus we could directly write as
   ```java
   return 
DataSetMetaInfo.withoutNumRegisteredPartitions(..).addShuffleDescriptors(...);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -244,7 +280,41 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
                 
internalExecutionGraphAccessor.getPartitionLocationConstraint(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                clusterPartitionShuffleDescriptors);
+    }
+
+    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+            getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+                executionVertex.getExecutionGraphAccessor();
+        final List<IntermediateDataSetID> consumedClusterDataSetIds =
+                
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
+        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors =
+                new HashMap<>();
+
+        for (IntermediateDataSetID consumedClusterDataSetId : 
consumedClusterDataSetIds) {
+            List<? extends ShuffleDescriptor> shuffleDescriptors =
+                    
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(

Review Comment:
   For the long run, I think we might directly query RM one time for all the 
shuffle descriptors. But this should be an optimization and could be revised 
later. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -137,6 +147,19 @@ private List<InputGateDeploymentDescriptor> 
createInputGateDeploymentDescriptors
                                     consumedIntermediateResult, 
consumedPartitionGroup)));
         }
 
+        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+                consumedClusterPartitionShuffleDescriptors.entrySet()) {
+            // For FLIP-205, the JobGraph generating side ensure that the 
cluster partition is
+            // produced with only one subpartition. Therefore, we always 
consume the partition with
+            // subparition index of 0.

Review Comment:
   nit subparition -> subpartition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to