tillrohrmann commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r696758432
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
Review comment:
```suggestion
partitionGroupReleaseStrategy.vertexFinished(finishedExecutionVertex);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
this.shuffleDescriptorCache.put(consumedPartitionGroup,
shuffleDescriptors);
}
- public void notifyPartitionChanged() {
- // When partitions change, the cache of shuffle descriptors is no
longer valid
- // and need to be removed.
- // Currently there are two scenarios:
+ public void notifyPartitionChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
Review comment:
```suggestion
public void notifyPartitionGroupChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
this.shuffleDescriptorCache.put(consumedPartitionGroup,
shuffleDescriptors);
}
- public void notifyPartitionChanged() {
- // When partitions change, the cache of shuffle descriptors is no
longer valid
- // and need to be removed.
- // Currently there are two scenarios:
+ public void notifyPartitionChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
+ // When partitions change, the cache of shuffle descriptors for the
consumed partition group
Review comment:
```suggestion
// When partition groups change, the cache of shuffle descriptors
for the consumed partition group
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -19,26 +19,25 @@
package
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import java.util.List;
/**
- * Interface for strategies that decide when to release {@link
IntermediateResultPartition
- * IntermediateResultPartitions}.
+ * Interface for strategies that decide when to release {@link
ConsumedPartitionGroup
+ * ConsumedPartitionGroups}.
*/
public interface PartitionReleaseStrategy {
Review comment:
```suggestion
public interface PartitionGroupReleaseStrategy {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
this.shuffleDescriptorCache.put(consumedPartitionGroup,
shuffleDescriptors);
}
- public void notifyPartitionChanged() {
- // When partitions change, the cache of shuffle descriptors is no
longer valid
- // and need to be removed.
- // Currently there are two scenarios:
+ public void notifyPartitionChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
Review comment:
Somehow the name `notfiyPartition(Group)Changed` does not really
transport what this method actually does. Maybe something like
`clearCachedInformationForPartitionGroup` or so better describes what this
method does.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
- releasePartitions(releasablePartitions);
+ releasePartitionGroups(releasablePartitionGroups);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitionGroups(
+ final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
Review comment:
```suggestion
// Remove cached ShuffleDescriptor when partition group is
released
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
- releasePartitions(releasablePartitions);
+ releasePartitionGroups(releasablePartitionGroups);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitionGroups(
+ final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
- releasablePartitions.stream()
-
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
- .distinct()
- .map(intermediateResults::get)
- .forEach(IntermediateResult::notifyPartitionChanged);
-
- final List<ResultPartitionID> partitionIds =
- releasablePartitions.stream()
+ for (ConsumedPartitionGroup releasablePartitionGroup :
releasablePartitionGroups) {
+ IntermediateResult totalResult =
+ checkNotNull(
+ intermediateResults.get(
+ releasablePartitionGroup
+ .getFirst()
+ .getIntermediateDataSetID()));
Review comment:
If all partitions of a `ConsumerPartitionGroup` must belong to the same
`IntermediateDataSet`, then we probably should introduce a
`ConsumePartitionGroup.getIntermediateDataSetID()` to make this contract
explicit. Additionally, we should add a check state in the constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]