tillrohrmann commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r696758432



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);

Review comment:
       ```suggestion
                       
partitionGroupReleaseStrategy.vertexFinished(finishedExecutionVertex);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
         this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
     }
 
-    public void notifyPartitionChanged() {
-        // When partitions change, the cache of shuffle descriptors is no 
longer valid
-        // and need to be removed.
-        // Currently there are two scenarios:
+    public void notifyPartitionChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {

Review comment:
       ```suggestion
       public void notifyPartitionGroupChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
         this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
     }
 
-    public void notifyPartitionChanged() {
-        // When partitions change, the cache of shuffle descriptors is no 
longer valid
-        // and need to be removed.
-        // Currently there are two scenarios:
+    public void notifyPartitionChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {
+        // When partitions change, the cache of shuffle descriptors for the 
consumed partition group

Review comment:
       ```suggestion
           // When partition groups change, the cache of shuffle descriptors 
for the consumed partition group
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -19,26 +19,25 @@
 
 package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
 
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 
 import java.util.List;
 
 /**
- * Interface for strategies that decide when to release {@link 
IntermediateResultPartition
- * IntermediateResultPartitions}.
+ * Interface for strategies that decide when to release {@link 
ConsumedPartitionGroup
+ * ConsumedPartitionGroups}.
  */
 public interface PartitionReleaseStrategy {

Review comment:
       ```suggestion
   public interface PartitionGroupReleaseStrategy {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
         this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
     }
 
-    public void notifyPartitionChanged() {
-        // When partitions change, the cache of shuffle descriptors is no 
longer valid
-        // and need to be removed.
-        // Currently there are two scenarios:
+    public void notifyPartitionChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {

Review comment:
       Somehow the name `notfiyPartition(Group)Changed` does not really 
transport what this method actually does. Maybe something like 
`clearCachedInformationForPartitionGroup` or so better describes what this 
method does.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
-            releasePartitions(releasablePartitions);
+            releasePartitionGroups(releasablePartitionGroups);
         } else {
             partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
         }
     }
 
-    private void releasePartitions(final List<IntermediateResultPartitionID> 
releasablePartitions) {
-        if (releasablePartitions.size() > 0) {
+    private void releasePartitionGroups(
+            final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+        if (releasablePartitionGroups.size() > 0) {
 
             // Remove cached ShuffleDescriptor when partition is released

Review comment:
       ```suggestion
               // Remove cached ShuffleDescriptor when partition group is 
released
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
-            releasePartitions(releasablePartitions);
+            releasePartitionGroups(releasablePartitionGroups);
         } else {
             partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
         }
     }
 
-    private void releasePartitions(final List<IntermediateResultPartitionID> 
releasablePartitions) {
-        if (releasablePartitions.size() > 0) {
+    private void releasePartitionGroups(
+            final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+        if (releasablePartitionGroups.size() > 0) {
 
             // Remove cached ShuffleDescriptor when partition is released
-            releasablePartitions.stream()
-                    
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
-                    .distinct()
-                    .map(intermediateResults::get)
-                    .forEach(IntermediateResult::notifyPartitionChanged);
-
-            final List<ResultPartitionID> partitionIds =
-                    releasablePartitions.stream()
+            for (ConsumedPartitionGroup releasablePartitionGroup : 
releasablePartitionGroups) {
+                IntermediateResult totalResult =
+                        checkNotNull(
+                                intermediateResults.get(
+                                        releasablePartitionGroup
+                                                .getFirst()
+                                                .getIntermediateDataSetID()));

Review comment:
       If all partitions of a `ConsumerPartitionGroup` must belong to the same 
`IntermediateDataSet`, then we probably should introduce a 
`ConsumePartitionGroup.getIntermediateDataSetID()` to make this contract 
explicit. Additionally, we should add a check state in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to