pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots
which contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273042838
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
##########
@@ -1003,26 +1008,37 @@ private void checkTaskManagerTimeouts() {
if (!taskManagerRegistrations.isEmpty()) {
long currentTime = System.currentTimeMillis();
- ArrayList<InstanceID> timedOutTaskManagerIds = new
ArrayList<>(taskManagerRegistrations.size());
+ ArrayList<TaskManagerRegistration> timedOutTaskManagers
= new ArrayList<>(taskManagerRegistrations.size());
// first retrieve the timed out TaskManagers
for (TaskManagerRegistration taskManagerRegistration :
taskManagerRegistrations.values()) {
if (currentTime -
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
// we collect the instance ids first in
order to avoid concurrent modifications by the
// ResourceActions.releaseResource call
-
timedOutTaskManagerIds.add(taskManagerRegistration.getInstanceId());
+
timedOutTaskManagers.add(taskManagerRegistration);
}
}
// second we trigger the release resource callback
which can decide upon the resource release
- final FlinkException cause = new
FlinkException("TaskExecutor exceeded the idle timeout.");
- for (InstanceID timedOutTaskManagerId :
timedOutTaskManagerIds) {
- LOG.debug("Release TaskExecutor {} because it
exceeded the idle timeout.", timedOutTaskManagerId);
-
resourceActions.releaseResource(timedOutTaskManagerId, cause);
+ for (TaskManagerRegistration taskManagerRegistration :
timedOutTaskManagers) {
+ InstanceID timedOutTaskManagerId =
taskManagerRegistration.getInstanceId();
+ if (waitResultConsumedToRelease) {
+ // checking whether TaskManagers can be
safely removed
+
taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
+ .thenRunAsync(() ->
releaseTaskExecutor(timedOutTaskManagerId), mainThreadExecutor);
Review comment:
? regardless if we the gateway can be released or not we always want to
release task executor?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services