This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04f2f0c2660b312449419a3acb58a46a38d84f64 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 4 14:49:10 2022 +0800 [FLINK-28144][runtime] Introduce SlotPoolService#releaseFreeSlotsOnTaskManager to release free slots eagerly when blocking nodes --- .../slotpool/DeclarativeSlotPoolService.java | 26 ++++++++++++ .../jobmaster/slotpool/SlotPoolService.java | 8 ++++ .../flink/runtime/jobmaster/JobMasterTest.java | 6 +++ .../slotpool/DeclarativeSlotPoolServiceTest.java | 47 ++++++++++++++++++++++ .../jobmaster/slotpool/TestingSlotPoolService.java | 6 +++ 5 files changed, 93 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index ec1a97005b6..5379c96cfea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** {@link SlotPoolService} implementation for the {@link DeclarativeSlotPool}. */ public class DeclarativeSlotPoolService implements SlotPoolService { @@ -234,6 +235,31 @@ public class DeclarativeSlotPoolService implements SlotPoolService { return false; } + @Override + public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) { + assertHasBeenStarted(); + if (isTaskManagerRegistered(taskManagerId)) { + + Collection<AllocationID> freeSlots = + declarativeSlotPool.getFreeSlotsInformation().stream() + .filter( + slotInfo -> + slotInfo.getTaskManagerLocation() + .getResourceID() + .equals(taskManagerId)) + .map(SlotInfoWithUtilization::getAllocationId) + .collect(Collectors.toSet()); + + for (AllocationID allocationId : freeSlots) { + final ResourceCounter previouslyFulfilledRequirement = + declarativeSlotPool.releaseSlot(allocationId, cause); + // release free slots, previously fulfilled requirement should be empty. + Preconditions.checkState( + previouslyFulfilledRequirement.equals(ResourceCounter.empty())); + } + } + } + private void releaseAllTaskManagers(Exception cause) { for (ResourceID registeredTaskManager : registeredTaskManagers) { internalReleaseTaskManager(registeredTaskManager, cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java index fc8b3671ee7..b0322dd2c5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java @@ -113,6 +113,14 @@ public interface SlotPoolService extends AutoCloseable { */ boolean releaseTaskManager(ResourceID taskManagerId, Exception cause); + /** + * Releases all free slots belonging to the owning TaskExecutor if it has been registered. + * + * @param taskManagerId identifying the TaskExecutor + * @param cause cause for failing the slots + */ + void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause); + /** * Connects the SlotPool to the given ResourceManager. After this method is called, the SlotPool * will be able to request resources from the given ResourceManager. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index d7a3afbb688..e07ad72516d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -518,6 +518,12 @@ class JobMasterTest { return true; } + @Override + public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) { + throw new UnsupportedOperationException( + "TestingSlotPool does not support this operation."); + } + @Override public Collection<SlotOffer> offerSlots( TaskManagerLocation taskManagerLocation, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 21d47552cfb..9d540128848 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -43,6 +43,8 @@ import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.FlinkException; import org.apache.flink.util.clock.SystemClock; +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -306,6 +308,51 @@ class DeclarativeSlotPoolServiceTest { } } + @Test + void testReleaseFreeSlotsOnTaskManager() throws Exception { + try (DeclarativeSlotPoolService slotPoolService = createDeclarativeSlotPoolService()) { + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + slotPoolService.registerTaskManager(taskManagerLocation.getResourceID()); + + final ResourceProfile resourceProfile = + ResourceProfile.newBuilder().setCpuCores(1).build(); + + SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, resourceProfile); + SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, resourceProfile); + + final DeclarativeSlotPool slotPool = slotPoolService.getDeclarativeSlotPool(); + slotPool.setResourceRequirements(ResourceCounter.withResource(resourceProfile, 2)); + + final DefaultDeclarativeSlotPoolTest.FreeSlotConsumer freeSlotConsumer = + new DefaultDeclarativeSlotPoolTest.FreeSlotConsumer(); + + final Collection<SlotOffer> slotOffers = Arrays.asList(slotOffer1, slotOffer2); + + slotPoolService.offerSlots( + taskManagerLocation, + new RpcTaskManagerGateway( + new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(), + jobMasterId), + slotOffers); + + // slot1 is reserved, slot2 is free. + slotPool.reserveFreeSlot(slotOffer1.getAllocationId(), resourceProfile); + + slotPoolService.releaseFreeSlotsOnTaskManager( + taskManagerLocation.getResourceID(), new FlinkException("Test cause")); + + assertThat(slotPool.getFreeSlotsInformation()).isEmpty(); + assertThat( + Iterables.getOnlyElement(slotPool.getAllSlotsInformation()) + .getAllocationId()) + .isEqualTo(slotOffer1.getAllocationId()); + assertThat(Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots())) + .isEqualTo(slotOffer2.getAllocationId()); + } + } + private DeclarativeSlotPoolService createDeclarativeSlotPoolService() throws Exception { return createDeclarativeSlotPoolService(new DefaultDeclarativeSlotPoolFactory()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java index 295f4059e24..5d10289686b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java @@ -147,6 +147,12 @@ public class TestingSlotPoolService implements SlotPoolService { return releaseTaskManagerFunction.apply(taskManagerId, cause); } + @Override + public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) { + throw new UnsupportedOperationException( + "TestingSlotPoolService does not support this operation."); + } + @Override public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { connectToResourceManagerConsumer.accept(resourceManagerGateway);
