zentol commented on a change in pull request #8687: [FLINK-12612][coordination] 
Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r295830911
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##########
 @@ -1407,6 +1404,23 @@ private void freeSlotInternal(AllocationID 
allocationId, Throwable cause) {
                
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
        }
 
+       private void closeJobManagerConnectionIfNoAllocatedResources(JobID 
jobId) {
+               // check whether we still have allocated slots for the same job
+               if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() && 
!shuffleEnvironment.hasPartitionsOccupyingLocalResources(jobId)) {
+                       // we can remove the job from the job leader service
+                       try {
+                               jobLeaderService.removeJob(jobId);
 
 Review comment:
   I'm not sure; I'm just delayed all operations that were done before.
   
   For the time being we could probably remove the job from the 
jobLeaderSerbice once the last slot was freed. After all, if a jobmaster 
failover occurs we remove all partitions anyway.
   
   Long-term though, once we introduce a grace-period for reconnects and/or 
failovers, I would've expected that we want to continue to observe for leader 
changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to