pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots 
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273042838
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##########
 @@ -1003,26 +1008,37 @@ private void checkTaskManagerTimeouts() {
                if (!taskManagerRegistrations.isEmpty()) {
                        long currentTime = System.currentTimeMillis();
 
-                       ArrayList<InstanceID> timedOutTaskManagerIds = new 
ArrayList<>(taskManagerRegistrations.size());
+                       ArrayList<TaskManagerRegistration> timedOutTaskManagers 
= new ArrayList<>(taskManagerRegistrations.size());
 
                        // first retrieve the timed out TaskManagers
                        for (TaskManagerRegistration taskManagerRegistration : 
taskManagerRegistrations.values()) {
                                if (currentTime - 
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
                                        // we collect the instance ids first in 
order to avoid concurrent modifications by the
                                        // ResourceActions.releaseResource call
-                                       
timedOutTaskManagerIds.add(taskManagerRegistration.getInstanceId());
+                                       
timedOutTaskManagers.add(taskManagerRegistration);
                                }
                        }
 
                        // second we trigger the release resource callback 
which can decide upon the resource release
-                       final FlinkException cause = new 
FlinkException("TaskExecutor exceeded the idle timeout.");
-                       for (InstanceID timedOutTaskManagerId : 
timedOutTaskManagerIds) {
-                               LOG.debug("Release TaskExecutor {} because it 
exceeded the idle timeout.", timedOutTaskManagerId);
-                               
resourceActions.releaseResource(timedOutTaskManagerId, cause);
+                       for (TaskManagerRegistration taskManagerRegistration : 
timedOutTaskManagers) {
+                               InstanceID timedOutTaskManagerId = 
taskManagerRegistration.getInstanceId();
+                               if (waitResultConsumedToRelease) {
+                                       // checking whether TaskManagers can be 
safely removed
+                                       
taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
+                                               .thenRunAsync(() -> 
releaseTaskExecutor(timedOutTaskManagerId), mainThreadExecutor);
 
 Review comment:
   ? regardless if we the gateway can be released or not we always want to 
release task executor?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to