This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5cabecb0f7b7ad25036badca40500782eacef848 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Dec 15 17:53:58 2020 +0100 [FLINK-20605][coordination] Rework cancellation of slot allocation futures The previous approach did not properly because it could happen that the future has been completed at the time it is being cancelled (e.g., because the corresponding task executor was unregistered). This order of events can happen since the processing of the allocation is done asynchronously, and can be scheduled after any other event. This caused the processing to run although we expected this not too happen, resulting in various errors, including: - completing an allocation despite being shut down - completing an allocation despite the task executor not being registered anymore - completing an allocation despite the slot report already having reporter a slot as allocated --- .../slotmanager/DeclarativeSlotManager.java | 41 +++------- .../slotmanager/DeclarativeSlotManagerTest.java | 87 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java index bd976fd..fd2eb4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java @@ -49,11 +49,11 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -79,7 +79,7 @@ public class DeclarativeSlotManager implements SlotManager { private final SlotManagerMetricGroup slotManagerMetricGroup; private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>(); - private final HashMap<SlotID, CompletableFuture<Acknowledge>> pendingSlotAllocationFutures; + private final Set<SlotID> pendingSlotAllocations; private boolean sendNotEnoughResourceNotifications = true; @@ -110,7 +110,7 @@ public class DeclarativeSlotManager implements SlotManager { this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup); this.resourceTracker = Preconditions.checkNotNull(resourceTracker); - pendingSlotAllocationFutures = new HashMap<>(16); + pendingSlotAllocations = new HashSet<>(16); this.slotTracker = Preconditions.checkNotNull(slotTracker); slotTracker.registerSlotStatusUpdateListener(createSlotStatusUpdateListener()); @@ -139,7 +139,7 @@ public class DeclarativeSlotManager implements SlotManager { private SlotStatusUpdateListener createSlotStatusUpdateListener() { return (taskManagerSlot, previous, current, jobId) -> { if (previous == SlotState.PENDING) { - cancelAllocationFuture(taskManagerSlot.getSlotId()); + pendingSlotAllocations.remove(taskManagerSlot.getSlotId()); } if (current == SlotState.PENDING) { @@ -158,14 +158,6 @@ public class DeclarativeSlotManager implements SlotManager { }; } - private void cancelAllocationFuture(SlotID slotId) { - final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = pendingSlotAllocationFutures.remove(slotId); - // the future may be null if we are just re-playing the state transitions due to a slot report - if (acknowledgeCompletableFuture != null) { - acknowledgeCompletableFuture.cancel(false); - } - } - @Override public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) { // this sets up a grace period, e.g., when the cluster was started, to give task executors time to connect @@ -489,11 +481,9 @@ public class DeclarativeSlotManager implements SlotManager { final TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); final TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); - final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>(); - slotTracker.notifyAllocationStart(slotId, jobId); taskExecutorManager.markUsed(instanceId); - pendingSlotAllocationFutures.put(slotId, completableFuture); + pendingSlotAllocations.add(slotId); // RPC call to the task manager CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot( @@ -505,17 +495,12 @@ public class DeclarativeSlotManager implements SlotManager { resourceManagerId, taskManagerRequestTimeout); - requestFuture.whenComplete( + CompletableFuture<Void> slotAllocationResponseProcessingFuture = requestFuture.handleAsync( (Acknowledge acknowledge, Throwable throwable) -> { - if (acknowledge != null) { - completableFuture.complete(acknowledge); - } else { - completableFuture.completeExceptionally(throwable); + if (!pendingSlotAllocations.contains(slotId)) { + LOG.debug("Ignoring slot allocation update from task executor {} for slot {} and job {}, because the allocation was already completed or cancelled.", instanceId, slotId, jobId); + return null; } - }); - - CompletableFuture<Void> slotAllocationResponseProcessingFuture = completableFuture.handleAsync( - (Acknowledge acknowledge, Throwable throwable) -> { if (acknowledge != null) { LOG.trace("Completed allocation of slot {} for job {}.", slotId, jobId); slotTracker.notifyAllocationComplete(slotId, jobId); @@ -527,12 +512,8 @@ public class DeclarativeSlotManager implements SlotManager { // this could be a problem if we ever assume that the task executor always reports about all slots slotTracker.notifySlotStatus(Collections.singleton(new SlotStatus(slotId, taskManagerSlot.getResourceProfile(), exception.getJobId(), exception.getAllocationId()))); } else { - if (throwable instanceof CancellationException) { - LOG.debug("Cancelled allocation of slot {} for job {}.", slotId, jobId, throwable); - } else { - LOG.warn("Slot allocation for slot {} for job {} failed.", slotId, jobId, throwable); - slotTracker.notifyFree(slotId); - } + LOG.warn("Slot allocation for slot {} for job {} failed.", slotId, jobId, throwable); + slotTracker.notifyFree(slotId); } checkResourceRequirements(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java index 4543e05..155d9c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -43,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.FunctionUtils; @@ -971,6 +973,76 @@ public class DeclarativeSlotManagerTest extends TestLogger { } } + @Test + public void testAllocationUpdatesIgnoredIfTaskExecutorUnregistered() throws Exception { + final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); + + final ResourceTracker resourceTracker = new DefaultResourceTracker(); + + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + // it is important that the returned future is already completed + // otherwise it will be cancelled when the task executor is unregistered + .setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())) + .createTestingTaskExecutorGateway(); + + final SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager(); + System.setSecurityManager(trackingSecurityManager); + try (final DeclarativeSlotManager slotManager = createDeclarativeSlotManagerBuilder() + .setResourceTracker(resourceTracker) + .buildAndStart(ResourceManagerId.generate(), executor, new TestingResourceActionsBuilder().build())) { + + JobID jobId = new JobID(); + slotManager.processResourceRequirements(createResourceRequirements(jobId, 1)); + + final TaskExecutorConnection taskExecutionConnection = createTaskExecutorConnection(taskExecutorGateway); + final SlotReport slotReport = createSlotReport(taskExecutionConnection.getResourceID(), 1); + + slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.unregisterTaskManager(taskExecutionConnection.getInstanceID(), TEST_EXCEPTION); + + executor.triggerAll(); + + assertThat(trackingSecurityManager.getSystemExitFuture().isDone(), is(false)); + } finally { + System.setSecurityManager(null); + } + } + + @Test + public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport() throws Exception { + final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); + + final ResourceTracker resourceTracker = new DefaultResourceTracker(); + + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + // it is important that the returned future is already completed + // otherwise it will be cancelled when the task executor is unregistered + .setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())) + .createTestingTaskExecutorGateway(); + + final SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager(); + System.setSecurityManager(trackingSecurityManager); + try (final DeclarativeSlotManager slotManager = createDeclarativeSlotManagerBuilder() + .setResourceTracker(resourceTracker) + .buildAndStart(ResourceManagerId.generate(), executor, new TestingResourceActionsBuilder().build())) { + + JobID jobId = new JobID(); + slotManager.processResourceRequirements(createResourceRequirements(jobId, 1)); + + final TaskExecutorConnection taskExecutionConnection = createTaskExecutorConnection(taskExecutorGateway); + final SlotReport slotReport = createSlotReport(taskExecutionConnection.getResourceID(), 1); + + slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1)); + + executor.triggerAll(); + + assertThat(trackingSecurityManager.getSystemExitFuture().isDone(), is(false)); + } finally { + System.setSecurityManager(null); + } + } + private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) { final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots); for (int i = 0; i < numberSlots; i++) { @@ -980,12 +1052,25 @@ public class DeclarativeSlotManagerTest extends TestLogger { return new SlotReport(slotStatusSet); } + private static SlotReport createSlotReportWithAllocatedSlots(ResourceID taskExecutorResourceId, JobID jobId, int numberSlots) { + final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots); + for (int i = 0; i < numberSlots; i++) { + slotStatusSet.add(createAllocatedSlotStatus(new SlotID(taskExecutorResourceId, i), jobId)); + } + + return new SlotReport(slotStatusSet); + } + private static SlotStatus createFreeSlotStatus(SlotID slotId) { return new SlotStatus(slotId, ResourceProfile.ANY); } private static SlotStatus createAllocatedSlotStatus(SlotID slotId) { - return new SlotStatus(slotId, ResourceProfile.ANY, JobID.generate(), new AllocationID()); + return createAllocatedSlotStatus(slotId, JobID.generate()); + } + + private static SlotStatus createAllocatedSlotStatus(SlotID slotId, JobID jobId) { + return new SlotStatus(slotId, ResourceProfile.ANY, jobId, new AllocationID()); } private DeclarativeSlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
