Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r695574774
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
##########
@@ -136,7 +138,24 @@ public void toggleVertexFinishedUnfinished() {
regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
final List<IntermediateResultPartitionID> partitionsToRelease =
- regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+ getReleasablePartitions(regionPartitionReleaseStrategy,
consumerVertex1);
assertThat(partitionsToRelease, is(empty()));
}
+
+ private static List<IntermediateResultPartitionID> getReleasablePartitions(
+ final RegionPartitionReleaseStrategy
regionPartitionReleaseStrategy,
+ final ExecutionVertexID finishedVertex) {
+
+ final List<IntermediateResultPartitionID> releasablePartitions = new
ArrayList<>();
Review comment:
Thank you for providing a better implementation. I just ignored the
`IterableUtils` can be used here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
##########
@@ -158,25 +159,122 @@ private void testCacheRemovedCorrectlyAfterFailover(
assertEquals(PARALLELISM, shuffleDescriptors.length);
assertEquals(expectedBefore, blobWriter.numberOfBlobs());
- triggerExceptionAndComplete(scheduler, v1);
+ triggerGlobalFailoverAndComplete(scheduler, v1);
ioExecutor.triggerAll();
// Cache should be removed during ExecutionVertex#resetForNewExecution
assertNull(getConsumedCachedShuffleDescriptor(executionGraph, v2));
assertEquals(expectedAfter, blobWriter.numberOfBlobs());
}
- private static DefaultScheduler createSchedulerAndDeploy(
+ @Test
+ public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished()
throws Exception {
+ testRemoveCacheForPointwiseEdgeAfterFinished(
+ new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+ }
+
+ @Test
+ public void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws
Exception {
+ testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0),
7, 6);
Review comment:
Agreed. I've added a comment here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitions =
Review comment:
Resolved. Furthermore, should we rename the method
`releasablePartitions` to `releasablePartitionGroups`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -36,9 +36,9 @@
* Calling this method informs the strategy that a vertex finished.
*
* @param finishedVertex Id of the vertex that finished the execution
- * @return A list of result partitions that can be released
+ * @return A list of {@link ConsumedPartitionGroup}s that can be released
Review comment:
Thank you for pointing this out.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitions =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
releasePartitions(releasablePartitions);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitions(final List<ConsumedPartitionGroup>
releasablePartitionGroups) {
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
- releasablePartitions.stream()
+ releasablePartitionGroups.stream()
+ .map(ConsumedPartitionGroup::getFirst)
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
.distinct()
.map(intermediateResults::get)
.forEach(IntermediateResult::notifyPartitionChanged);
- final List<ResultPartitionID> partitionIds =
- releasablePartitions.stream()
- .map(this::createResultPartitionId)
- .collect(Collectors.toList());
+ final List<ResultPartitionID> releasablePartitionIds = new
ArrayList<>();
Review comment:
Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]