Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r697127383
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
this.shuffleDescriptorCache.put(consumedPartitionGroup,
shuffleDescriptors);
}
- public void notifyPartitionChanged() {
- // When partitions change, the cache of shuffle descriptors is no
longer valid
- // and need to be removed.
- // Currently there are two scenarios:
+ public void notifyPartitionChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
Review comment:
I agree. If in the future there are more things to do when a partition
group is changed, we can add a new method that invokes this method
`clearCachedInformationForConsumedPartitionGroup`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
- releasePartitions(releasablePartitions);
+ releasePartitionGroups(releasablePartitionGroups);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitionGroups(
+ final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -19,26 +19,25 @@
package
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import java.util.List;
/**
- * Interface for strategies that decide when to release {@link
IntermediateResultPartition
- * IntermediateResultPartitions}.
+ * Interface for strategies that decide when to release {@link
ConsumedPartitionGroup
+ * ConsumedPartitionGroups}.
*/
public interface PartitionReleaseStrategy {
Review comment:
Since the name of `PartitionReleaseStrategy ` is changed, I also change
the name of `RegionPartitionReleaseStrategy`,
`NotReleasingPartitionReleaseStrategy`, `PartitionReleaseStrategyFactoryLoader`
and their tests. This PR seems to be large now.😂
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1268,37 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitionGroups =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
- releasePartitions(releasablePartitions);
+ releasePartitionGroups(releasablePartitionGroups);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitionGroups(
+ final List<ConsumedPartitionGroup> releasablePartitionGroups) {
+
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
- releasablePartitions.stream()
-
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
- .distinct()
- .map(intermediateResults::get)
- .forEach(IntermediateResult::notifyPartitionChanged);
-
- final List<ResultPartitionID> partitionIds =
- releasablePartitions.stream()
+ for (ConsumedPartitionGroup releasablePartitionGroup :
releasablePartitionGroups) {
+ IntermediateResult totalResult =
+ checkNotNull(
+ intermediateResults.get(
+ releasablePartitionGroup
+ .getFirst()
+ .getIntermediateDataSetID()));
Review comment:
This makes the invocation here much clearer, thank you, @tillrohrmann!
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -173,28 +172,22 @@ public void cacheShuffleDescriptors(
this.shuffleDescriptorCache.put(consumedPartitionGroup,
shuffleDescriptors);
}
- public void notifyPartitionChanged() {
- // When partitions change, the cache of shuffle descriptors is no
longer valid
- // and need to be removed.
- // Currently there are two scenarios:
+ public void notifyPartitionChanged(ConsumedPartitionGroup
consumedPartitionGroup) {
+ // When partitions change, the cache of shuffle descriptors for the
consumed partition group
Review comment:
Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]