zentol commented on a change in pull request #8687: [FLINK-12612][coordination]
Track stored partition on the TaskExecutor
URL: https://github.com/apache/flink/pull/8687#discussion_r295830911
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1407,6 +1404,23 @@ private void freeSlotInternal(AllocationID
allocationId, Throwable cause) {
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}
+ private void closeJobManagerConnectionIfNoAllocatedResources(JobID
jobId) {
+ // check whether we still have allocated slots for the same job
+ if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() &&
!shuffleEnvironment.hasPartitionsOccupyingLocalResources(jobId)) {
+ // we can remove the job from the job leader service
+ try {
+ jobLeaderService.removeJob(jobId);
Review comment:
I'm not sure; I'm just delayed all operations that were done before.
For the time being we could probably remove the job from the
jobLeaderSerbice once the last slot was freed. After all, if a jobmaster
failover occurs we remove all partitions anyway.
Long-term though, once we introduce a grace-period for reconnects and/or
failovers, I would've expected that we want to continue to observe for leader
changes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services