This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 435ebfa82c3b67bfe722833b61d604915ac9ab6d Author: Yangze Guo <[email protected]> AuthorDate: Wed Nov 25 17:09:58 2020 +0800 [FLINK-20836][runtime] Pass total resource and default slot resource in registering TaskManager to SlotManager This closes #14561 --- .../runtime/resourcemanager/ResourceManager.java | 6 +- .../slotmanager/DeclarativeSlotManager.java | 7 +- .../resourcemanager/slotmanager/SlotManager.java | 7 +- .../slotmanager/SlotManagerImpl.java | 7 +- .../slotmanager/DeclarativeSlotManagerTest.java | 74 ++++++++++++----- .../SlotManagerFailUnfulfillableTest.java | 3 +- .../slotmanager/SlotManagerImplTest.java | 97 ++++++++++++++++------ .../slotmanager/SlotProtocolTest.java | 10 ++- .../TaskManagerCheckInSlotManagerTest.java | 17 +++- .../slotmanager/TestingSlotManager.java | 5 +- 10 files changed, 177 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index a2b4977..deb94b0 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -464,7 +464,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> taskExecutors.get(taskManagerResourceId); if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) { - if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) { + if (slotManager.registerTaskManager( + workerTypeWorkerRegistration, + slotReport, + workerTypeWorkerRegistration.getTotalResourceProfile(), + workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) { onWorkerRegistered(workerTypeWorkerRegistration.getWorker()); } return CompletableFuture.completedFuture(Acknowledge.get()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java index 188a230..8aed8f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java @@ -270,12 +270,17 @@ public class DeclarativeSlotManager implements SlotManager { * * @param taskExecutorConnection for the new task manager * @param initialSlotReport for the new task manager + * @param totalResourceProfile for the new task manager + * @param defaultSlotResourceProfile for the new task manager * @return True if the task manager has not been registered before and is registered * successfully; otherwise false */ @Override public boolean registerTaskManager( - final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + final TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { checkInit(); LOG.debug( "Registering task executor {} under {} at the slot manager.", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index b074562..e24056f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -122,11 +122,16 @@ public interface SlotManager extends AutoCloseable { * * @param taskExecutorConnection for the new task manager * @param initialSlotReport for the new task manager + * @param totalResourceProfile for the new task manager + * @param defaultSlotResourceProfile for the new task manager * @return True if the task manager has not been registered before and is registered * successfully; otherwise false */ boolean registerTaskManager( - TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport); + TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile); /** * Unregisters the task manager identified by the given instance id and its associated slots diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java index b59b436..e1b8329 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java @@ -454,12 +454,17 @@ public class SlotManagerImpl implements SlotManager { * * @param taskExecutorConnection for the new task manager * @param initialSlotReport for the new task manager + * @param totalResourceProfile for the new task manager + * @param defaultSlotResourceProfile for the new task manager * @return True if the task manager has not been registered before and is registered * successfully; otherwise false */ @Override public boolean registerTaskManager( - final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + final TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { checkInit(); LOG.debug( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java index 77d3a33..7cab61f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java @@ -126,7 +126,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec()) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat( "The number registered slots does not equal the expected number.", @@ -165,7 +166,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec()) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertEquals( "The number registered slots does not equal the expected number.", @@ -305,7 +307,11 @@ public class DeclarativeSlotManagerTest extends TestLogger { if (scenario == RequirementDeclarationScenario .TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION) { - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); } final ResourceRequirements requirements = @@ -318,7 +324,11 @@ public class DeclarativeSlotManagerTest extends TestLogger { if (scenario == RequirementDeclarationScenario .TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION) { - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); } assertThat( @@ -357,7 +367,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec()) { - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId); @@ -404,7 +415,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.processResourceRequirements(requirements); @@ -455,7 +467,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec()) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.processResourceRequirements(resourceRequirements1); @@ -546,7 +559,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { // check that we don't have any slots registered assertEquals(0, slotManager.getNumberRegisteredSlots()); - slotManager.registerTaskManager(taskManagerConnection, slotReport1); + slotManager.registerTaskManager( + taskManagerConnection, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY); DeclarativeTaskManagerSlot slot1 = slotTracker.getSlot(slotId1); DeclarativeTaskManagerSlot slot2 = slotTracker.getSlot(slotId2); @@ -606,7 +620,10 @@ public class DeclarativeSlotManagerTest extends TestLogger { CompletableFuture.runAsync( () -> slotManager.registerTaskManager( - taskManagerConnection, slotReport), + taskManagerConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY), mainThreadExecutor) .thenRun( () -> @@ -661,7 +678,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .setSlotTracker(slotTracker) .buildAndStartWithDirectExec()) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.processResourceRequirements(resourceRequirements); @@ -735,7 +753,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { mainThreadExecutor, new TestingResourceActionsBuilder().build()); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.processResourceRequirements(resourceRequirements); final SlotID firstRequestedSlotId = requestedSlotIds.take(); @@ -782,7 +801,11 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotID slotId = new SlotID(taskManagerId, 0); final SlotReport initialSlotReport = new SlotReport(createFreeSlotStatus(slotId)); - slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport); + slotManager.registerTaskManager( + taskExecutorConnection, + initialSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1))); @@ -860,7 +883,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport(createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0))); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take(); @@ -941,7 +965,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport(createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0))); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take(); @@ -986,7 +1011,11 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotReport slotReport1 = createSlotReport(taskExecutionConnection1.getResourceID(), 2); - slotManager.registerTaskManager(taskExecutionConnection1, slotReport1); + slotManager.registerTaskManager( + taskExecutionConnection1, + slotReport1, + ResourceProfile.ANY, + ResourceProfile.ANY); slotManager.unregisterTaskManager( taskExecutionConnection1.getInstanceID(), TEST_EXCEPTION); @@ -1096,7 +1125,11 @@ public class DeclarativeSlotManagerTest extends TestLogger { createTaskExecutorConnection(taskExecutorGateway); final SlotReport firstSlotReport = createSlotReport(firstTaskExecutorConnection.getResourceID(), 2); - slotManager.registerTaskManager(firstTaskExecutorConnection, firstSlotReport); + slotManager.registerTaskManager( + firstTaskExecutorConnection, + firstSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); } @Test @@ -1146,7 +1179,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { .createTestingTaskExecutorGateway()); final SlotReport slotReport = createSlotReport(taskExecutorResourceId, numExistingSlots); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); ResourceRequirements resourceRequirements = createResourceRequirements(jobId, numRequiredSlots); @@ -1203,7 +1237,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotReport slotReport = createSlotReport(taskExecutionConnection.getResourceID(), 1); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.unregisterTaskManager( taskExecutionConnection.getInstanceID(), TEST_EXCEPTION); @@ -1250,7 +1285,8 @@ public class DeclarativeSlotManagerTest extends TestLogger { final SlotReport slotReport = createSlotReport(taskExecutionConnection.getResourceID(), 1); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.reportSlotStatus( taskExecutionConnection.getInstanceID(), createSlotReportWithAllocatedSlots( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java index 23d3964..fee3971 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java @@ -234,7 +234,8 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { Collections.singleton( new SlotStatus(new SlotID(resourceID, 0), slotProfile))); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); } private static SlotRequest slotRequest(ResourceProfile profile) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java index 5e5850b..4c1aba5 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java @@ -124,7 +124,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertTrue( "The number registered slots does not equal the expected number.", @@ -170,7 +171,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertTrue( "The number registered slots does not equal the expected number.", @@ -294,7 +296,8 @@ public class SlotManagerImplTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); final SlotReport slotReport = new SlotReport(slotStatus); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertTrue( "The slot request should be accepted", @@ -352,7 +355,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -428,7 +432,8 @@ public class SlotManagerImplTest extends TestLogger { assertThat(numberAllocateResourceCalls.get(), is(1)); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat( requestFuture.get(), @@ -471,7 +476,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -552,7 +558,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertFalse(slotManager.registerSlotRequest(slotRequest)); } @@ -593,7 +600,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertTrue(slotManager.registerSlotRequest(slotRequest1)); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -645,7 +653,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertTrue(slotManager.registerSlotRequest(slotRequest1)); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -735,7 +744,8 @@ public class SlotManagerImplTest extends TestLogger { // check that we don't have any slots registered assertTrue(0 == slotManager.getNumberRegisteredSlots()); - slotManager.registerTaskManager(taskManagerConnection, slotReport1); + slotManager.registerTaskManager( + taskManagerConnection, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY); TaskManagerSlot slot1 = slotManager.getSlot(slotId1); TaskManagerSlot slot2 = slotManager.getSlot(slotId2); @@ -853,7 +863,8 @@ public class SlotManagerImplTest extends TestLogger { try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.registerSlotRequest(slotRequest); @@ -939,7 +950,10 @@ public class SlotManagerImplTest extends TestLogger { CompletableFuture.supplyAsync( () -> { slotManager.registerTaskManager( - taskManagerConnection, slotReport); + taskManagerConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); return null; }, @@ -1054,7 +1068,10 @@ public class SlotManagerImplTest extends TestLogger { .thenRun( () -> slotManager.registerTaskManager( - taskManagerConnection, initialSlotReport)); + taskManagerConnection, + initialSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY)); final SlotID slotId = requestedSlotFuture.get(); @@ -1128,7 +1145,11 @@ public class SlotManagerImplTest extends TestLogger { .setRedundantTaskManagerNum(0) .buildAndStartWithDirectExec(resourceManagerId, resourceActions)) { - slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport); + slotManager.registerTaskManager( + taskExecutorConnection, + initialSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); assertEquals(1, slotManager.getNumberRegisteredSlots()); @@ -1167,7 +1188,11 @@ public class SlotManagerImplTest extends TestLogger { final SlotStatus initialSlotStatus = new SlotStatus(slotId, ResourceProfile.ANY); final SlotReport initialSlotReport = new SlotReport(initialSlotStatus); - slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport); + slotManager.registerTaskManager( + taskExecutorConnection, + initialSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1))); @@ -1246,7 +1271,8 @@ public class SlotManagerImplTest extends TestLogger { new CompletableFuture<>(); responseQueue.offer(firstManualSlotRequestResponse); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take(); @@ -1328,7 +1354,8 @@ public class SlotManagerImplTest extends TestLogger { new CompletableFuture<>(); responseQueue.offer(firstManualSlotRequestResponse); - slotManager.registerTaskManager(taskExecutionConnection, slotReport); + slotManager.registerTaskManager( + taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take(); @@ -1402,7 +1429,11 @@ public class SlotManagerImplTest extends TestLogger { // register the task-manager-1 to the slot manager, this will trigger the slot // allocation for job1. - slotManager.registerTaskManager(taskExecutionConnection1, slotReport1); + slotManager.registerTaskManager( + taskExecutionConnection1, + slotReport1, + ResourceProfile.ANY, + ResourceProfile.ANY); // register slot request for job2. JobID jobId2 = new JobID(); @@ -1427,7 +1458,11 @@ public class SlotManagerImplTest extends TestLogger { // register the task-manager-2 to the slot manager, this will trigger the slot // allocation for job2 and job3. - slotManager.registerTaskManager(taskExecutionConnection2, slotReport2); + slotManager.registerTaskManager( + taskExecutionConnection2, + slotReport2, + ResourceProfile.ANY, + ResourceProfile.ANY); // validate for job1. slotManager.unregisterTaskManager( @@ -1628,7 +1663,8 @@ public class SlotManagerImplTest extends TestLogger { resourceProfile, SlotManagerImplTest::createEmptySlotStatus); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1)); assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1)); @@ -1676,7 +1712,8 @@ public class SlotManagerImplTest extends TestLogger { offeredSlotProfile, SlotManagerImplTest::createEmptySlotStatus); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots)); assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); @@ -1701,7 +1738,8 @@ public class SlotManagerImplTest extends TestLogger { new SlotStatus(slotId, ResourceProfile.ANY, jobId, new AllocationID()); final SlotReport slotReport = new SlotReport(slotStatus); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat(slotManager.getNumberRegisteredSlots(), is(1)); assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); @@ -1730,7 +1768,8 @@ public class SlotManagerImplTest extends TestLogger { SlotReport slotReport = createSingleAllocatedSlotReport( taskExecutorConnection.getResourceID(), new JobID()); - slotManager.registerTaskManager(taskExecutorConnection, slotReport); + slotManager.registerTaskManager( + taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID(), failureCause); assertThat(allocationFailureCause.get(), FlinkMatchers.containsCause(failureCause)); @@ -1869,8 +1908,10 @@ public class SlotManagerImplTest extends TestLogger { .setMaxSlotNum(maxSlotNum) .setRedundantTaskManagerNum(0) .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection1, slotReport1); - slotManager.registerTaskManager(taskManagerConnection2, slotReport2); + slotManager.registerTaskManager( + taskManagerConnection1, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY); + slotManager.registerTaskManager( + taskManagerConnection2, slotReport2, ResourceProfile.ANY, ResourceProfile.ANY); assertThat( "The number registered slots does not equal the expected number.", @@ -1901,7 +1942,11 @@ public class SlotManagerImplTest extends TestLogger { createTaskExecutorConnection(taskExecutorGateway); final SlotReport firstSlotReport = createSlotReport(firstTaskExecutorConnection.getResourceID(), 2); - slotManager.registerTaskManager(firstTaskExecutorConnection, firstSlotReport); + slotManager.registerTaskManager( + firstTaskExecutorConnection, + firstSlotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 6156264..13d5401 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -127,7 +127,10 @@ public class SlotProtocolTest extends TestLogger { final SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); // register slot at SlotManager slotManager.registerTaskManager( - new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); + new TaskExecutorConnection(resourceID, taskExecutorGateway), + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); // 4) Slot becomes available and TaskExecutor gets a SlotRequest assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); @@ -173,7 +176,10 @@ public class SlotProtocolTest extends TestLogger { final SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); // register slot at SlotManager slotManager.registerTaskManager( - new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); + new TaskExecutorConnection(resourceID, taskExecutorGateway), + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY); final String targetAddress = "foobar"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java index 7985804..d7c4881 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java @@ -228,7 +228,8 @@ public class TaskManagerCheckInSlotManagerTest extends TestLogger { .setRedundantTaskManagerNum(redundantTaskManagerNum) .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) { - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerTaskManager( + taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -289,13 +290,23 @@ public class TaskManagerCheckInSlotManagerTest extends TestLogger { new TaskExecutorConnection(resourceID, taskExecutorGateway); mainThreadExecutor.execute( - () -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); + () -> + slotManager.registerTaskManager( + taskManagerConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY)); } private SlotManagerImpl createAndStartSlotManagerWithTM() { SlotManagerImpl slotManager = createAndStartSlotManager(0, 1); mainThreadExecutor.execute( - () -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); + () -> + slotManager.registerTaskManager( + taskManagerConnection, + slotReport, + ResourceProfile.ANY, + ResourceProfile.ANY)); return slotManager; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java index 5982aab..801965e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java @@ -121,7 +121,10 @@ public class TestingSlotManager implements SlotManager { @Override public boolean registerTaskManager( - TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { return true; }
