Sxnan commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r874416498


##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -45,6 +56,9 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final Map<IntermediateDataSetID, 
Collection<NettyShuffleDescriptor>>

Review Comment:
   If I understand correctly, the life cycle of the `JobMasterPartitionTracker` 
is bound to a job. It cannot keep the mapping from `IntermediateDataSetID` to 
`ShuffleDescriptors` of the cluster partitions across jobs. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -299,6 +302,21 @@ private void handleTaskFailure(
         final FailureHandlingResult failureHandlingResult =
                 executionFailureHandler.getFailureHandlingResult(
                         executionVertexId, error, timestamp);
+
+        // Notify shuffle master that the cached intermediate dataset is 
corrupted.
+        if (failureHandlingResult.getError() instanceof 
CacheCorruptedException) {

Review Comment:
   If I understand correctly, the `AdativeBatchScheduler` extends the 
`DefaultScheduler`. The `handleTaskFailure` method is called in 
`updateTaskExecutionStateInternal`. As long as the `AdativeBatchScheduler` and 
`AdaptiveScheduler` don't overwrite the method, it should be fine to only make 
change in `DefaultScheduler`. What do you think?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -481,8 +481,18 @@ public IntermediateDataSet createAndAddResultDataSet(
 
     public JobEdge connectNewDataSetAsInput(
             JobVertex input, DistributionPattern distPattern, 
ResultPartitionType partitionType) {
+        return this.connectNewDataSetAsInput(
+                input, distPattern, partitionType, new 
IntermediateDataSetID());
+    }
+
+    public JobEdge connectNewDataSetAsInput(
+            JobVertex input,
+            DistributionPattern distPattern,
+            ResultPartitionType partitionType,
+            IntermediateDataSetID intermediateDataSetID) {

Review Comment:
   Fixed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -175,9 +194,27 @@ public JobVertex(String name, JobVertexID id) {
      * @param operatorIDPairs The operator ID pairs of the job vertex.
      */
     public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> 
operatorIDPairs) {
+        this(name, primaryId, operatorIDPairs, null);
+    }
+
+    /**
+     * Constructs a new job vertex and assigns it with the given name.
+     *
+     * @param name The name of the new job vertex.
+     * @param primaryId The id of the job vertex.
+     * @param operatorIDPairs The operator ID pairs of the job vertex.
+     * @param intermediateDataSetID The id of the cached intermediate dataset 
that the job vertex
+     *     consumes.
+     */
+    public JobVertex(

Review Comment:
   This is actually the IntermediateDataSetId of the cluster partitions to be 
consumed by the JobVertex. Thus, it cannot be replaced with the 
`JobVertex#createAndAddResultDataSet`.
   
   I change the field to a list. And we can add the `IntermediateDataSetId` by 
`JobVertex#addIntermediateDataSetIdToConsume`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -142,6 +142,12 @@ public class JobVertex implements java.io.Serializable {
      */
     private String resultOptimizerProperties;
 
+    /**
+     * Optional, the intermediateDataSetId of the cached intermediate dataset 
that the job vertex
+     * consumes.
+     */
+    @Nullable private final IntermediateDataSetID intermediateDataSetID;

Review Comment:
   Thanks for the comments. It doesn't hurt to make it a list of 
`IntermediateDataSet` for generality, although, for FLIP-205, we only support 
consuming one `IntermediateDataSet`.
   
   I have made it a list of `IntermediateDataSet`.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java:
##########
@@ -42,6 +43,8 @@ public class DefaultSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
 
     private Set<ConsumedPartitionGroup> blockingConsumedPartitionGroups;
 
+    private Set<ConsumedPartitionGroup> persistentConsumedPartitionGroups;

Review Comment:
   Thanks for your comment. 
   
   `ResultPartitionType` is added to the ConsumedPartitionGroup and simplify 
the logic of 
`RegionPartitionGroupReleaseStrategy#filterReleasablePartitionGroups`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##########
@@ -84,6 +86,33 @@ CompletableFuture<T> registerPartitionWithProducer(
             PartitionDescriptor partitionDescriptor,
             ProducerDescriptor producerDescriptor);
 
+    /**
+     * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+     * given id.
+     *
+     * @param intermediateDataSetID The id of hte intermediate data set.

Review Comment:
   Fixed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -85,7 +88,9 @@ private TaskDeploymentDescriptorFactory(
             List<ConsumedPartitionGroup> consumedPartitionGroups,
             Function<IntermediateResultPartitionID, 
IntermediateResultPartition>
                     resultPartitionRetriever,
-            BlobWriter blobWriter) {
+            BlobWriter blobWriter,
+            IntermediateDataSetID cachedIntermediateDataSetID,
+            Collection<? extends ShuffleDescriptor> 
clusterPartitionShuffleDescriptors) {

Review Comment:
   Fixed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
                 executionVertex.getParallelSubtaskIndex(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                intermediateDataSetID,

Review Comment:
   It is removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -241,6 +271,23 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
             ExecutionVertex executionVertex, int attemptNumber) throws 
IOException {
         InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
+        final IntermediateDataSetID intermediateDataSetID =

Review Comment:
   It is renamed to consumedClusterDataSetId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -253,7 +300,9 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
                 executionVertex.getParallelSubtaskIndex(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                intermediateDataSetID,
+                clusterPartitionShuffleDescriptors);

Review Comment:
   I think we can filter the shuffleDescriptor here to have the processing 
logic in one place. 
   
   However, the ShuffleDescriptors of the ClusterPartitions are returned by the 
`ShuffleMaster#getClusterPartitionShuffleDescriptors`, which doesn't guarantee 
`clusterPartitionShuffleDescriptors[subtaskIndex]` is the shuffle descriptor 
for the subtask.
   
   Or we can let the `ShuffleMaster#getClusterPartitionShuffleDescriptors` 
return a list of clusterPartitionShuffleDescriptors ordered by the partition 
number. Then it is up to the ShuffleMaster to ensure the assumption holds.
   
   What do you think?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -241,6 +271,23 @@ public static TaskDeploymentDescriptorFactory 
fromExecutionVertex(
             ExecutionVertex executionVertex, int attemptNumber) throws 
IOException {
         InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
+        final IntermediateDataSetID intermediateDataSetID =
+                
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIDToConsume();
+        Collection<? extends ShuffleDescriptor> 
clusterPartitionShuffleDescriptors = null;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to