codenohup commented on code in PR #2932:
URL: https://github.com/apache/celeborn/pull/2932#discussion_r1851261674
##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java:
##########
@@ -129,10 +129,6 @@ public ShuffleIOOwnerContext createShuffleIOOwnerContext(
nettyGroup.addGroup(METRIC_GROUP_INPUT));
}
- public void releasePartitionsLocally(Collection<ResultPartitionID>
partitionIds) {
- throw new FlinkRuntimeException("Not implemented yet.");
- }
-
public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
Review Comment:
The `getPartitionsOccupyingLocalResources` return empty list may cause the
TaskManager to be released unexpectedly, and the shuffle data on the
TaskManager will be lost.
The method `getPartitionsOccupyingLocalResources` of `ShuffleEnvironment`
represents whether the TaskManager should be released when it becomes idle. In
case the upstream vertex has finished, and the downstream vertex has not
started due to insufficient resources, the TaskManager of the upstream vertex
will become idle and be released unexpectedly.
IMO, the method should return `partitionManager#getUnreleasedPartitions`, as
the `ResultPartition` created by the Flink framework (such as
`SortMergeResultPartition`) will be registered to the `partitionManager`, and
the `ResultPartition` created by Celeborn (`RemoteShuffleResultPartition`) is
not registered with the `partitionManager`, as it does not occupy TaskManager
resources.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]