zhuzhurk commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r695472448
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -36,9 +36,9 @@
* Calling this method informs the strategy that a vertex finished.
*
* @param finishedVertex Id of the vertex that finished the execution
- * @return A list of result partitions that can be released
+ * @return A list of {@link ConsumedPartitionGroup}s that can be released
Review comment:
The java doc of this class needs to be updated as well.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitions =
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
releasePartitions(releasablePartitions);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}
- private void releasePartitions(final List<IntermediateResultPartitionID>
releasablePartitions) {
- if (releasablePartitions.size() > 0) {
+ private void releasePartitions(final List<ConsumedPartitionGroup>
releasablePartitionGroups) {
+ if (releasablePartitionGroups.size() > 0) {
// Remove cached ShuffleDescriptor when partition is released
- releasablePartitions.stream()
+ releasablePartitionGroups.stream()
+ .map(ConsumedPartitionGroup::getFirst)
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
.distinct()
.map(intermediateResults::get)
.forEach(IntermediateResult::notifyPartitionChanged);
- final List<ResultPartitionID> partitionIds =
- releasablePartitions.stream()
- .map(this::createResultPartitionId)
- .collect(Collectors.toList());
+ final List<ResultPartitionID> releasablePartitionIds = new
ArrayList<>();
Review comment:
It can be simplified with java stream. see the other comment.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
##########
@@ -136,7 +138,24 @@ public void toggleVertexFinishedUnfinished() {
regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
final List<IntermediateResultPartitionID> partitionsToRelease =
- regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+ getReleasablePartitions(regionPartitionReleaseStrategy,
consumerVertex1);
assertThat(partitionsToRelease, is(empty()));
}
+
+ private static List<IntermediateResultPartitionID> getReleasablePartitions(
+ final RegionPartitionReleaseStrategy
regionPartitionReleaseStrategy,
+ final ExecutionVertexID finishedVertex) {
+
+ final List<IntermediateResultPartitionID> releasablePartitions = new
ArrayList<>();
Review comment:
maybe simplify it with java stream? e.g.
```
return regionPartitionReleaseStrategy.vertexFinished(finishedVertex).stream()
.flatMap(IterableUtils::toStream)
.collect(Collectors.toList());
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution
attempt) {
final ExecutionVertexID finishedExecutionVertex =
attempt.getVertex().getID();
if (attempt.getState() == ExecutionState.FINISHED) {
- final List<IntermediateResultPartitionID> releasablePartitions =
+ final List<ConsumedPartitionGroup> releasablePartitions =
Review comment:
`releasablePartitions` -> `releasablePartitionGroups`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]