gaoyunhaii commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r892215954
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -18,16 +18,25 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.util.Preconditions;
+import java.util.Comparator;
+import java.util.Map;
import java.util.OptionalInt;
+import java.util.SortedMap;
+import java.util.TreeMap;
/** Container for meta-data of a data set. */
public final class DataSetMetaInfo {
private static final int UNKNOWN = -1;
private final int numRegisteredPartitions;
private final int numTotalPartitions;
+ private final SortedMap<ResultPartitionID, ShuffleDescriptor>
+ shuffleDescriptorsOrderByPartitionNumber =
Review Comment:
nit: `shuffleDescriptorsOrderByPartitionId` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +486,18 @@ public IntermediateDataSet createAndAddResultDataSet(
public JobEdge connectNewDataSetAsInput(
JobVertex input, DistributionPattern distPattern,
ResultPartitionType partitionType) {
+ return this.connectNewDataSetAsInput(
+ input, distPattern, partitionType, new
IntermediateDataSetID());
+ }
+
+ public JobEdge connectNewDataSetAsInput(
+ JobVertex input,
+ DistributionPattern distPattern,
+ ResultPartitionType partitionType,
+ IntermediateDataSetID intermediateDataSetId) {
- IntermediateDataSet dataSet =
input.createAndAddResultDataSet(partitionType);
+ IntermediateDataSet dataSet =
+ input.createAndAddResultDataSet(intermediateDataSetId,
partitionType);
Review Comment:
The originally called method `createAndAddResultDataSet(ResultPartitionType
partitionType)` seems not used now.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +486,18 @@ public IntermediateDataSet createAndAddResultDataSet(
public JobEdge connectNewDataSetAsInput(
JobVertex input, DistributionPattern distPattern,
ResultPartitionType partitionType) {
+ return this.connectNewDataSetAsInput(
Review Comment:
Not use `this.` except for the constructors
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -44,6 +53,15 @@ public int getNumTotalPartitions() {
return numTotalPartitions;
}
+ public void addShuffleDescriptors(
Review Comment:
nit: might also returns `DataSetMetaInfo`, thus we could directly write as
```java
return
DataSetMetaInfo.withoutNumRegisteredPartitions(..).addShuffleDescriptors(...);
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -244,7 +280,41 @@ public static TaskDeploymentDescriptorFactory
fromExecutionVertex(
internalExecutionGraphAccessor.getPartitionLocationConstraint(),
executionVertex.getAllConsumedPartitionGroups(),
internalExecutionGraphAccessor::getResultPartitionOrThrow,
- internalExecutionGraphAccessor.getBlobWriter());
+ internalExecutionGraphAccessor.getBlobWriter(),
+ clusterPartitionShuffleDescriptors);
+ }
+
+ private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ getClusterPartitionShuffleDescriptors(ExecutionVertex
executionVertex) {
+ final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+ executionVertex.getExecutionGraphAccessor();
+ final List<IntermediateDataSetID> consumedClusterDataSetIds =
+
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
+ Map<IntermediateDataSetID, ShuffleDescriptor[]>
clusterPartitionShuffleDescriptors =
+ new HashMap<>();
+
+ for (IntermediateDataSetID consumedClusterDataSetId :
consumedClusterDataSetIds) {
+ List<? extends ShuffleDescriptor> shuffleDescriptors =
+
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
Review Comment:
For the long run, I think we might directly query RM one time for all the
shuffle descriptors. But this should be an optimization and could be revised
later.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -137,6 +147,19 @@ private List<InputGateDeploymentDescriptor>
createInputGateDeploymentDescriptors
consumedIntermediateResult,
consumedPartitionGroup)));
}
+ for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+ consumedClusterPartitionShuffleDescriptors.entrySet()) {
+ // For FLIP-205, the JobGraph generating side ensure that the
cluster partition is
+ // produced with only one subpartition. Therefore, we always
consume the partition with
+ // subparition index of 0.
Review Comment:
nit subparition -> subpartition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]