This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c762cec46be9328466fc8bdc87337c221eaa82e
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Thu Jul 4 16:12:32 2019 +0200

    [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if 
there were no slot allocations after the partition check
    
    The ResourceManager looks out for TaskManagers that have not had any slots 
allocated on them for a while, as these could be released to safe resources.
    If such a TM is found, the RM checks via an RPC call whether the TM still 
holds any partitions. If no partition is held then the TM is released.
    However, in the RPC callback no check is made whether the TM is actually 
still idle. In the meantime a slot could have been allocated on the TM.
    Even if the slot has been freed, there can be newly allocated partitions 
not included in check result.
    
    To make sure there was no resource allocation in between, we can mark the 
taskManagerRegistration.getIdleSince() time before starting the async 'no 
partition' check.
    The TM can be released only if the idle time after the check matches the 
previously marked one. Otherwise we discard the release and start over after 
the next timeout.
    
    This closes #8988.
---
 .../resourcemanager/slotmanager/SlotManager.java   | 28 +++++++++++++-------
 .../slotmanager/SlotManagerTest.java               | 30 +++++++++++++++++-----
 .../taskexecutor/TestingTaskExecutorGateway.java   |  6 ++---
 .../TestingTaskExecutorGatewayBuilder.java         |  4 +--
 4 files changed, 47 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 571b5bc..d85aec5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -1023,22 +1023,32 @@ public class SlotManager implements AutoCloseable {
 
                        // second we trigger the release resource callback 
which can decide upon the resource release
                        for (TaskManagerRegistration taskManagerRegistration : 
timedOutTaskManagers) {
-                               InstanceID timedOutTaskManagerId = 
taskManagerRegistration.getInstanceId();
                                if (waitResultConsumedBeforeRelease) {
-                                       // checking whether TaskManagers can be 
safely removed
-                                       
taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
-                                               .thenAcceptAsync(canBeReleased 
-> {
-                                                       if (canBeReleased) {
-                                                               
releaseTaskExecutor(timedOutTaskManagerId);
-                                                       }},
-                                                       mainThreadExecutor);
+                                       
releaseTaskExecutorIfPossible(taskManagerRegistration);
                                } else {
-                                       
releaseTaskExecutor(timedOutTaskManagerId);
+                                       
releaseTaskExecutor(taskManagerRegistration.getInstanceId());
                                }
                        }
                }
        }
 
+       private void releaseTaskExecutorIfPossible(TaskManagerRegistration 
taskManagerRegistration) {
+               long idleSince = taskManagerRegistration.getIdleSince();
+               taskManagerRegistration
+                       .getTaskManagerConnection()
+                       .getTaskExecutorGateway()
+                       .canBeReleased()
+                       .thenAcceptAsync(
+                               canBeReleased -> {
+                                       InstanceID timedOutTaskManagerId = 
taskManagerRegistration.getInstanceId();
+                                       boolean stillIdle = idleSince == 
taskManagerRegistration.getIdleSince();
+                                       if (stillIdle && canBeReleased) {
+                                               
releaseTaskExecutor(timedOutTaskManagerId);
+                                       }
+                               },
+                               mainThreadExecutor);
+       }
+
        private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
                final FlinkException cause = new FlinkException("TaskExecutor 
exceeded the idle timeout.");
                LOG.debug("Release TaskExecutor {} because it exceeded the idle 
timeout.", timedOutTaskManagerId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 1e1d214..8760f10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -71,7 +71,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -719,7 +718,7 @@ public class SlotManagerTest extends TestLogger {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final ResourceID resourceID = ResourceID.generate();
 
-               final AtomicBoolean canBeReleased = new AtomicBoolean(false);
+               final AtomicReference<CompletableFuture<Boolean>> canBeReleased 
= new AtomicReference<>();
                final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
                        .setCanBeReleasedSupplier(canBeReleased::get)
                        .createTestingTaskExecutorGateway();
@@ -742,14 +741,31 @@ public class SlotManagerTest extends TestLogger {
                        mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
                        // now it can not be released yet
-                       canBeReleased.set(false);
-                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+                       canBeReleased.set(new CompletableFuture<>());
+                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
                        mainThreadExecutor.triggerAll();
-                       assertFalse(releaseFuture.isDone());
+                       canBeReleased.get().complete(false);
+                       mainThreadExecutor.triggerAll();
+                       assertThat(releaseFuture.isDone(), is(false));
+
+                       // Allocate and free slot between triggering 
TM.canBeReleased request and receiving response.
+                       // There can be potentially newly unreleased 
partitions, therefore TM can not be released yet.
+                       canBeReleased.set(new CompletableFuture<>());
+                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
+                       mainThreadExecutor.triggerAll();
+                       AllocationID allocationID = new AllocationID();
+                       slotManager.registerSlotRequest(new SlotRequest(new 
JobID(), allocationID, resourceProfile, "foobar"));
+                       mainThreadExecutor.triggerAll();
+                       slotManager.freeSlot(slotId, allocationID);
+                       canBeReleased.get().complete(true);
+                       mainThreadExecutor.triggerAll();
+                       assertThat(releaseFuture.isDone(), is(false));
 
                        // now it can and should be released
-                       canBeReleased.set(true);
-                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+                       canBeReleased.set(new CompletableFuture<>());
+                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
+                       mainThreadExecutor.triggerAll();
+                       canBeReleased.get().complete(true);
                        mainThreadExecutor.triggerAll();
                        assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 8c20e49..3aa00e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -72,7 +72,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
        private final Function<ExecutionAttemptID, 
CompletableFuture<Acknowledge>> cancelTaskFunction;
 
-       private final Supplier<Boolean> canBeReleasedSupplier;
+       private final Supplier<CompletableFuture<Boolean>> 
canBeReleasedSupplier;
 
        private final BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer;
 
@@ -87,7 +87,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
                        Consumer<ResourceID> heartbeatResourceManagerConsumer,
                        Consumer<Exception> disconnectResourceManagerConsumer,
                        Function<ExecutionAttemptID, 
CompletableFuture<Acknowledge>> cancelTaskFunction,
-                       Supplier<Boolean> canBeReleasedSupplier,
+                       Supplier<CompletableFuture<Boolean>> 
canBeReleasedSupplier,
                        BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer) {
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
@@ -186,7 +186,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
        @Override
        public CompletableFuture<Boolean> canBeReleased() {
-               return 
CompletableFuture.completedFuture(canBeReleasedSupplier.get());
+               return canBeReleasedSupplier.get();
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 770d28e..c176ff8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -64,7 +64,7 @@ public class TestingTaskExecutorGatewayBuilder {
        private Consumer<ResourceID> heartbeatResourceManagerConsumer = 
NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER;
        private Consumer<Exception> disconnectResourceManagerConsumer = 
NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER;
        private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> 
cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
-       private Supplier<Boolean> canBeReleasedSupplier = () -> true;
+       private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () 
-> CompletableFuture.completedFuture(true);
        private BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER;
 
        public TestingTaskExecutorGatewayBuilder setAddress(String address) {
@@ -117,7 +117,7 @@ public class TestingTaskExecutorGatewayBuilder {
                return this;
        }
 
-       public TestingTaskExecutorGatewayBuilder 
setCanBeReleasedSupplier(Supplier<Boolean> canBeReleasedSupplier) {
+       public TestingTaskExecutorGatewayBuilder 
setCanBeReleasedSupplier(Supplier<CompletableFuture<Boolean>> 
canBeReleasedSupplier) {
                this.canBeReleasedSupplier = canBeReleasedSupplier;
                return this;
        }

Reply via email to