This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5cabecb0f7b7ad25036badca40500782eacef848
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Dec 15 17:53:58 2020 +0100

    [FLINK-20605][coordination] Rework cancellation of slot allocation futures
    
    The previous approach did not properly because it could happen that the 
future has been completed at the time it is being cancelled (e.g., because the 
corresponding task
     executor was unregistered). This order of events can happen since the 
processing of the allocation is done asynchronously, and can be scheduled after 
any other event.
    This caused the processing to run although we expected this not too happen, 
resulting in various errors, including:
    - completing an allocation despite being shut down
    - completing an allocation despite the task executor not being registered 
anymore
    - completing an allocation despite the slot report already having reporter 
a slot as allocated
---
 .../slotmanager/DeclarativeSlotManager.java        | 41 +++-------
 .../slotmanager/DeclarativeSlotManagerTest.java    | 87 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index bd976fd..fd2eb4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
@@ -79,7 +79,7 @@ public class DeclarativeSlotManager implements SlotManager {
        private final SlotManagerMetricGroup slotManagerMetricGroup;
 
        private final Map<JobID, String> jobMasterTargetAddresses = new 
HashMap<>();
-       private final HashMap<SlotID, CompletableFuture<Acknowledge>> 
pendingSlotAllocationFutures;
+       private final Set<SlotID> pendingSlotAllocations;
 
        private boolean sendNotEnoughResourceNotifications = true;
 
@@ -110,7 +110,7 @@ public class DeclarativeSlotManager implements SlotManager {
                this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
                this.resourceTracker = 
Preconditions.checkNotNull(resourceTracker);
 
-               pendingSlotAllocationFutures = new HashMap<>(16);
+               pendingSlotAllocations = new HashSet<>(16);
 
                this.slotTracker = Preconditions.checkNotNull(slotTracker);
                
slotTracker.registerSlotStatusUpdateListener(createSlotStatusUpdateListener());
@@ -139,7 +139,7 @@ public class DeclarativeSlotManager implements SlotManager {
        private SlotStatusUpdateListener createSlotStatusUpdateListener() {
                return (taskManagerSlot, previous, current, jobId) -> {
                        if (previous == SlotState.PENDING) {
-                               
cancelAllocationFuture(taskManagerSlot.getSlotId());
+                               
pendingSlotAllocations.remove(taskManagerSlot.getSlotId());
                        }
 
                        if (current == SlotState.PENDING) {
@@ -158,14 +158,6 @@ public class DeclarativeSlotManager implements SlotManager 
{
                };
        }
 
-       private void cancelAllocationFuture(SlotID slotId) {
-               final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = pendingSlotAllocationFutures.remove(slotId);
-               // the future may be null if we are just re-playing the state 
transitions due to a slot report
-               if (acknowledgeCompletableFuture != null) {
-                       acknowledgeCompletableFuture.cancel(false);
-               }
-       }
-
        @Override
        public void setFailUnfulfillableRequest(boolean 
failUnfulfillableRequest) {
                // this sets up a grace period, e.g., when the cluster was 
started, to give task executors time to connect
@@ -489,11 +481,9 @@ public class DeclarativeSlotManager implements SlotManager 
{
                final TaskExecutorConnection taskExecutorConnection = 
taskManagerSlot.getTaskManagerConnection();
                final TaskExecutorGateway gateway = 
taskExecutorConnection.getTaskExecutorGateway();
 
-               final CompletableFuture<Acknowledge> completableFuture = new 
CompletableFuture<>();
-
                slotTracker.notifyAllocationStart(slotId, jobId);
                taskExecutorManager.markUsed(instanceId);
-               pendingSlotAllocationFutures.put(slotId, completableFuture);
+               pendingSlotAllocations.add(slotId);
 
                // RPC call to the task manager
                CompletableFuture<Acknowledge> requestFuture = 
gateway.requestSlot(
@@ -505,17 +495,12 @@ public class DeclarativeSlotManager implements 
SlotManager {
                        resourceManagerId,
                        taskManagerRequestTimeout);
 
-               requestFuture.whenComplete(
+               CompletableFuture<Void> slotAllocationResponseProcessingFuture 
= requestFuture.handleAsync(
                        (Acknowledge acknowledge, Throwable throwable) -> {
-                               if (acknowledge != null) {
-                                       completableFuture.complete(acknowledge);
-                               } else {
-                                       
completableFuture.completeExceptionally(throwable);
+                               if (!pendingSlotAllocations.contains(slotId)) {
+                                       LOG.debug("Ignoring slot allocation 
update from task executor {} for slot {} and job {}, because the allocation was 
already completed or cancelled.", instanceId, slotId, jobId);
+                                       return null;
                                }
-                       });
-
-               CompletableFuture<Void> slotAllocationResponseProcessingFuture 
= completableFuture.handleAsync(
-                       (Acknowledge acknowledge, Throwable throwable) -> {
                                if (acknowledge != null) {
                                        LOG.trace("Completed allocation of slot 
{} for job {}.", slotId, jobId);
                                        
slotTracker.notifyAllocationComplete(slotId, jobId);
@@ -527,12 +512,8 @@ public class DeclarativeSlotManager implements SlotManager 
{
                                                // this could be a problem if 
we ever assume that the task executor always reports about all slots
                                                
slotTracker.notifySlotStatus(Collections.singleton(new SlotStatus(slotId, 
taskManagerSlot.getResourceProfile(), exception.getJobId(), 
exception.getAllocationId())));
                                        } else {
-                                               if (throwable instanceof 
CancellationException) {
-                                                       LOG.debug("Cancelled 
allocation of slot {} for job {}.", slotId, jobId, throwable);
-                                               } else {
-                                                       LOG.warn("Slot 
allocation for slot {} for job {} failed.", slotId, jobId, throwable);
-                                                       
slotTracker.notifyFree(slotId);
-                                               }
+                                               LOG.warn("Slot allocation for 
slot {} for job {} failed.", slotId, jobId, throwable);
+                                               slotTracker.notifyFree(slotId);
                                        }
                                        checkResourceRequirements();
                                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 4543e05..155d9c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple6;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -43,6 +44,7 @@ import 
org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
@@ -971,6 +973,76 @@ public class DeclarativeSlotManagerTest extends TestLogger 
{
                }
        }
 
+       @Test
+       public void testAllocationUpdatesIgnoredIfTaskExecutorUnregistered() 
throws Exception {
+               final ManuallyTriggeredScheduledExecutorService executor = new 
ManuallyTriggeredScheduledExecutorService();
+
+               final ResourceTracker resourceTracker = new 
DefaultResourceTracker();
+
+               final TestingTaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       // it is important that the returned future is already 
completed
+                       // otherwise it will be cancelled when the task 
executor is unregistered
+                       .setRequestSlotFunction(ignored -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                       .createTestingTaskExecutorGateway();
+
+               final SystemExitTrackingSecurityManager trackingSecurityManager 
= new SystemExitTrackingSecurityManager();
+               System.setSecurityManager(trackingSecurityManager);
+               try (final DeclarativeSlotManager slotManager = 
createDeclarativeSlotManagerBuilder()
+                       .setResourceTracker(resourceTracker)
+                       .buildAndStart(ResourceManagerId.generate(), executor, 
new TestingResourceActionsBuilder().build())) {
+
+                       JobID jobId = new JobID();
+                       
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+
+                       final TaskExecutorConnection taskExecutionConnection = 
createTaskExecutorConnection(taskExecutorGateway);
+                       final SlotReport slotReport = 
createSlotReport(taskExecutionConnection.getResourceID(), 1);
+
+                       
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+                       
slotManager.unregisterTaskManager(taskExecutionConnection.getInstanceID(), 
TEST_EXCEPTION);
+
+                       executor.triggerAll();
+
+                       
assertThat(trackingSecurityManager.getSystemExitFuture().isDone(), is(false));
+               } finally {
+                       System.setSecurityManager(null);
+               }
+       }
+
+       @Test
+       public void 
testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport() throws 
Exception {
+               final ManuallyTriggeredScheduledExecutorService executor = new 
ManuallyTriggeredScheduledExecutorService();
+
+               final ResourceTracker resourceTracker = new 
DefaultResourceTracker();
+
+               final TestingTaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       // it is important that the returned future is already 
completed
+                       // otherwise it will be cancelled when the task 
executor is unregistered
+                       .setRequestSlotFunction(ignored -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                       .createTestingTaskExecutorGateway();
+
+               final SystemExitTrackingSecurityManager trackingSecurityManager 
= new SystemExitTrackingSecurityManager();
+               System.setSecurityManager(trackingSecurityManager);
+               try (final DeclarativeSlotManager slotManager = 
createDeclarativeSlotManagerBuilder()
+                       .setResourceTracker(resourceTracker)
+                       .buildAndStart(ResourceManagerId.generate(), executor, 
new TestingResourceActionsBuilder().build())) {
+
+                       JobID jobId = new JobID();
+                       
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+
+                       final TaskExecutorConnection taskExecutionConnection = 
createTaskExecutorConnection(taskExecutorGateway);
+                       final SlotReport slotReport = 
createSlotReport(taskExecutionConnection.getResourceID(), 1);
+
+                       
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+                       
slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), 
createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), 
jobId, 1));
+
+                       executor.triggerAll();
+
+                       
assertThat(trackingSecurityManager.getSystemExitFuture().isDone(), is(false));
+               } finally {
+                       System.setSecurityManager(null);
+               }
+       }
+
        private static SlotReport createSlotReport(ResourceID 
taskExecutorResourceId, int numberSlots) {
                final Set<SlotStatus> slotStatusSet = new 
HashSet<>(numberSlots);
                for (int i = 0; i < numberSlots; i++) {
@@ -980,12 +1052,25 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
                return new SlotReport(slotStatusSet);
        }
 
+       private static SlotReport createSlotReportWithAllocatedSlots(ResourceID 
taskExecutorResourceId, JobID jobId, int numberSlots) {
+               final Set<SlotStatus> slotStatusSet = new 
HashSet<>(numberSlots);
+               for (int i = 0; i < numberSlots; i++) {
+                       slotStatusSet.add(createAllocatedSlotStatus(new 
SlotID(taskExecutorResourceId, i), jobId));
+               }
+
+               return new SlotReport(slotStatusSet);
+       }
+
        private static SlotStatus createFreeSlotStatus(SlotID slotId) {
                return new SlotStatus(slotId, ResourceProfile.ANY);
        }
 
        private static SlotStatus createAllocatedSlotStatus(SlotID slotId) {
-               return new SlotStatus(slotId, ResourceProfile.ANY, 
JobID.generate(), new AllocationID());
+               return createAllocatedSlotStatus(slotId, JobID.generate());
+       }
+
+       private static SlotStatus createAllocatedSlotStatus(SlotID slotId, 
JobID jobId) {
+               return new SlotStatus(slotId, ResourceProfile.ANY, jobId, new 
AllocationID());
        }
 
        private DeclarativeSlotManager createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {

Reply via email to