Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r697127383



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
         this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
     }
 
-    public void notifyPartitionChanged() {
-        // When partitions change, the cache of shuffle descriptors is no 
longer valid
-        // and need to be removed.
-        // Currently there are two scenarios:
+    public void notifyPartitionChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {

Review comment:
       I agree. If in the future there are more things to do when a partition 
group is changed, we can add a new method that invokes this method 
`clearCachedInformationForConsumedPartitionGroup`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
-            releasePartitions(releasablePartitions);
+            releasePartitionGroups(releasablePartitionGroups);
         } else {
             partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
         }
     }
 
-    private void releasePartitions(final List<IntermediateResultPartitionID> 
releasablePartitions) {
-        if (releasablePartitions.size() > 0) {
+    private void releasePartitionGroups(
+            final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+        if (releasablePartitionGroups.size() > 0) {
 
             // Remove cached ShuffleDescriptor when partition is released

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -19,26 +19,25 @@
 
 package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
 
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 
 import java.util.List;
 
 /**
- * Interface for strategies that decide when to release {@link 
IntermediateResultPartition
- * IntermediateResultPartitions}.
+ * Interface for strategies that decide when to release {@link 
ConsumedPartitionGroup
+ * ConsumedPartitionGroups}.
  */
 public interface PartitionReleaseStrategy {

Review comment:
       Since the name of `PartitionReleaseStrategy ` is changed, I also change 
the name of `RegionPartitionReleaseStrategy`, 
`NotReleasingPartitionReleaseStrategy`, `PartitionReleaseStrategyFactoryLoader` 
and their tests. This PR seems to be large now.😂 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitionGroups =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
-            releasePartitions(releasablePartitions);
+            releasePartitionGroups(releasablePartitionGroups);
         } else {
             partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
         }
     }
 
-    private void releasePartitions(final List<IntermediateResultPartitionID> 
releasablePartitions) {
-        if (releasablePartitions.size() > 0) {
+    private void releasePartitionGroups(
+            final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+        if (releasablePartitionGroups.size() > 0) {
 
             // Remove cached ShuffleDescriptor when partition is released
-            releasablePartitions.stream()
-                    
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
-                    .distinct()
-                    .map(intermediateResults::get)
-                    .forEach(IntermediateResult::notifyPartitionChanged);
-
-            final List<ResultPartitionID> partitionIds =
-                    releasablePartitions.stream()
+            for (ConsumedPartitionGroup releasablePartitionGroup : 
releasablePartitionGroups) {
+                IntermediateResult totalResult =
+                        checkNotNull(
+                                intermediateResults.get(
+                                        releasablePartitionGroup
+                                                .getFirst()
+                                                .getIntermediateDataSetID()));

Review comment:
       This makes the invocation here much clearer, thank you, @tillrohrmann!

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
         this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
     }
 
-    public void notifyPartitionChanged() {
-        // When partitions change, the cache of shuffle descriptors is no 
longer valid
-        // and need to be removed.
-        // Currently there are two scenarios:
+    public void notifyPartitionChanged(ConsumedPartitionGroup 
consumedPartitionGroup) {
+        // When partitions change, the cache of shuffle descriptors for the 
consumed partition group

Review comment:
       Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to