This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 435ebfa82c3b67bfe722833b61d604915ac9ab6d
Author: Yangze Guo <[email protected]>
AuthorDate: Wed Nov 25 17:09:58 2020 +0800

    [FLINK-20836][runtime] Pass total resource and default slot resource in 
registering TaskManager to SlotManager
    
    This closes #14561
---
 .../runtime/resourcemanager/ResourceManager.java   |  6 +-
 .../slotmanager/DeclarativeSlotManager.java        |  7 +-
 .../resourcemanager/slotmanager/SlotManager.java   |  7 +-
 .../slotmanager/SlotManagerImpl.java               |  7 +-
 .../slotmanager/DeclarativeSlotManagerTest.java    | 74 ++++++++++++-----
 .../SlotManagerFailUnfulfillableTest.java          |  3 +-
 .../slotmanager/SlotManagerImplTest.java           | 97 ++++++++++++++++------
 .../slotmanager/SlotProtocolTest.java              | 10 ++-
 .../TaskManagerCheckInSlotManagerTest.java         | 17 +++-
 .../slotmanager/TestingSlotManager.java            |  5 +-
 10 files changed, 177 insertions(+), 56 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a2b4977..deb94b0 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -464,7 +464,11 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                 taskExecutors.get(taskManagerResourceId);
 
         if 
(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId))
 {
-            if (slotManager.registerTaskManager(workerTypeWorkerRegistration, 
slotReport)) {
+            if (slotManager.registerTaskManager(
+                    workerTypeWorkerRegistration,
+                    slotReport,
+                    workerTypeWorkerRegistration.getTotalResourceProfile(),
+                    
workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) {
                 onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
             }
             return CompletableFuture.completedFuture(Acknowledge.get());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 188a230..8aed8f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -270,12 +270,17 @@ public class DeclarativeSlotManager implements 
SlotManager {
      *
      * @param taskExecutorConnection for the new task manager
      * @param initialSlotReport for the new task manager
+     * @param totalResourceProfile for the new task manager
+     * @param defaultSlotResourceProfile for the new task manager
      * @return True if the task manager has not been registered before and is 
registered
      *     successfully; otherwise false
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
         checkInit();
         LOG.debug(
                 "Registering task executor {} under {} at the slot manager.",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index b074562..e24056f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -122,11 +122,16 @@ public interface SlotManager extends AutoCloseable {
      *
      * @param taskExecutorConnection for the new task manager
      * @param initialSlotReport for the new task manager
+     * @param totalResourceProfile for the new task manager
+     * @param defaultSlotResourceProfile for the new task manager
      * @return True if the task manager has not been registered before and is 
registered
      *     successfully; otherwise false
      */
     boolean registerTaskManager(
-            TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport);
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
 
     /**
      * Unregisters the task manager identified by the given instance id and 
its associated slots
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index b59b436..e1b8329 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -454,12 +454,17 @@ public class SlotManagerImpl implements SlotManager {
      *
      * @param taskExecutorConnection for the new task manager
      * @param initialSlotReport for the new task manager
+     * @param totalResourceProfile for the new task manager
+     * @param defaultSlotResourceProfile for the new task manager
      * @return True if the task manager has not been registered before and is 
registered
      *     successfully; otherwise false
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
         checkInit();
 
         LOG.debug(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 77d3a33..7cab61f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -126,7 +126,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec()) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(
                     "The number registered slots does not equal the expected 
number.",
@@ -165,7 +166,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec()) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertEquals(
                     "The number registered slots does not equal the expected 
number.",
@@ -305,7 +307,11 @@ public class DeclarativeSlotManagerTest extends TestLogger 
{
             if (scenario
                     == RequirementDeclarationScenario
                             
.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION) {
-                slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+                slotManager.registerTaskManager(
+                        taskExecutorConnection,
+                        slotReport,
+                        ResourceProfile.ANY,
+                        ResourceProfile.ANY);
             }
 
             final ResourceRequirements requirements =
@@ -318,7 +324,11 @@ public class DeclarativeSlotManagerTest extends TestLogger 
{
             if (scenario
                     == RequirementDeclarationScenario
                             
.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION) {
-                slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+                slotManager.registerTaskManager(
+                        taskExecutorConnection,
+                        slotReport,
+                        ResourceProfile.ANY,
+                        ResourceProfile.ANY);
             }
 
             assertThat(
@@ -357,7 +367,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec()) {
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
 
@@ -404,7 +415,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec(resourceManagerId, 
resourceManagerActions)) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             slotManager.processResourceRequirements(requirements);
 
@@ -455,7 +467,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec()) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             slotManager.processResourceRequirements(resourceRequirements1);
 
@@ -546,7 +559,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
             // check that we don't have any slots registered
             assertEquals(0, slotManager.getNumberRegisteredSlots());
 
-            slotManager.registerTaskManager(taskManagerConnection, 
slotReport1);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport1, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             DeclarativeTaskManagerSlot slot1 = slotTracker.getSlot(slotId1);
             DeclarativeTaskManagerSlot slot2 = slotTracker.getSlot(slotId2);
@@ -606,7 +620,10 @@ public class DeclarativeSlotManagerTest extends TestLogger 
{
             CompletableFuture.runAsync(
                             () ->
                                     slotManager.registerTaskManager(
-                                            taskManagerConnection, slotReport),
+                                            taskManagerConnection,
+                                            slotReport,
+                                            ResourceProfile.ANY,
+                                            ResourceProfile.ANY),
                             mainThreadExecutor)
                     .thenRun(
                             () ->
@@ -661,7 +678,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec()) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             slotManager.processResourceRequirements(resourceRequirements);
 
@@ -735,7 +753,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
                     mainThreadExecutor,
                     new TestingResourceActionsBuilder().build());
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             slotManager.processResourceRequirements(resourceRequirements);
 
             final SlotID firstRequestedSlotId = requestedSlotIds.take();
@@ -782,7 +801,11 @@ public class DeclarativeSlotManagerTest extends TestLogger 
{
             final SlotID slotId = new SlotID(taskManagerId, 0);
             final SlotReport initialSlotReport = new 
SlotReport(createFreeSlotStatus(slotId));
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
initialSlotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection,
+                    initialSlotReport,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1)));
 
@@ -860,7 +883,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
             final SlotReport slotReport =
                     new SlotReport(createFreeSlotStatus(new 
SlotID(taskExecutorResourceId, 0)));
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, 
ResourceManagerId>
                     firstRequest = requestSlotQueue.take();
@@ -941,7 +965,8 @@ public class DeclarativeSlotManagerTest extends TestLogger {
             final SlotReport slotReport =
                     new SlotReport(createFreeSlotStatus(new 
SlotID(taskExecutorResourceId, 0)));
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, 
ResourceManagerId>
                     firstRequest = requestSlotQueue.take();
@@ -986,7 +1011,11 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
             final SlotReport slotReport1 =
                     createSlotReport(taskExecutionConnection1.getResourceID(), 
2);
 
-            slotManager.registerTaskManager(taskExecutionConnection1, 
slotReport1);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection1,
+                    slotReport1,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             slotManager.unregisterTaskManager(
                     taskExecutionConnection1.getInstanceID(), TEST_EXCEPTION);
@@ -1096,7 +1125,11 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
                 createTaskExecutorConnection(taskExecutorGateway);
         final SlotReport firstSlotReport =
                 createSlotReport(firstTaskExecutorConnection.getResourceID(), 
2);
-        slotManager.registerTaskManager(firstTaskExecutorConnection, 
firstSlotReport);
+        slotManager.registerTaskManager(
+                firstTaskExecutorConnection,
+                firstSlotReport,
+                ResourceProfile.ANY,
+                ResourceProfile.ANY);
     }
 
     @Test
@@ -1146,7 +1179,8 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
                                     .createTestingTaskExecutorGateway());
             final SlotReport slotReport =
                     createSlotReport(taskExecutorResourceId, numExistingSlots);
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             ResourceRequirements resourceRequirements =
                     createResourceRequirements(jobId, numRequiredSlots);
@@ -1203,7 +1237,8 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
             final SlotReport slotReport =
                     createSlotReport(taskExecutionConnection.getResourceID(), 
1);
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             slotManager.unregisterTaskManager(
                     taskExecutionConnection.getInstanceID(), TEST_EXCEPTION);
 
@@ -1250,7 +1285,8 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
             final SlotReport slotReport =
                     createSlotReport(taskExecutionConnection.getResourceID(), 
1);
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             slotManager.reportSlotStatus(
                     taskExecutionConnection.getInstanceID(),
                     createSlotReportWithAllocatedSlots(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index 23d3964..fee3971 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -234,7 +234,8 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
                         Collections.singleton(
                                 new SlotStatus(new SlotID(resourceID, 0), 
slotProfile)));
 
-        slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+        slotManager.registerTaskManager(
+                taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
     }
 
     private static SlotRequest slotRequest(ResourceProfile profile) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
index 5e5850b..4c1aba5 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
@@ -124,7 +124,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertTrue(
                     "The number registered slots does not equal the expected 
number.",
@@ -170,7 +171,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertTrue(
                     "The number registered slots does not equal the expected 
number.",
@@ -294,7 +296,8 @@ public class SlotManagerImplTest extends TestLogger {
             final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
             final SlotReport slotReport = new SlotReport(slotStatus);
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertTrue(
                     "The slot request should be accepted",
@@ -352,7 +355,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -428,7 +432,8 @@ public class SlotManagerImplTest extends TestLogger {
 
             assertThat(numberAllocateResourceCalls.get(), is(1));
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(
                     requestFuture.get(),
@@ -471,7 +476,8 @@ public class SlotManagerImplTest extends TestLogger {
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -552,7 +558,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManager slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertFalse(slotManager.registerSlotRequest(slotRequest));
         }
@@ -593,7 +600,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
             TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -645,7 +653,8 @@ public class SlotManagerImplTest extends TestLogger {
 
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
             TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -735,7 +744,8 @@ public class SlotManagerImplTest extends TestLogger {
             // check that we don't have any slots registered
             assertTrue(0 == slotManager.getNumberRegisteredSlots());
 
-            slotManager.registerTaskManager(taskManagerConnection, 
slotReport1);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport1, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
             TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
@@ -853,7 +863,8 @@ public class SlotManagerImplTest extends TestLogger {
         try (SlotManagerImpl slotManager =
                 createSlotManager(resourceManagerId, resourceManagerActions)) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             slotManager.registerSlotRequest(slotRequest);
 
@@ -939,7 +950,10 @@ public class SlotManagerImplTest extends TestLogger {
                     CompletableFuture.supplyAsync(
                                     () -> {
                                         slotManager.registerTaskManager(
-                                                taskManagerConnection, 
slotReport);
+                                                taskManagerConnection,
+                                                slotReport,
+                                                ResourceProfile.ANY,
+                                                ResourceProfile.ANY);
 
                                         return null;
                                     },
@@ -1054,7 +1068,10 @@ public class SlotManagerImplTest extends TestLogger {
                     .thenRun(
                             () ->
                                     slotManager.registerTaskManager(
-                                            taskManagerConnection, 
initialSlotReport));
+                                            taskManagerConnection,
+                                            initialSlotReport,
+                                            ResourceProfile.ANY,
+                                            ResourceProfile.ANY));
 
             final SlotID slotId = requestedSlotFuture.get();
 
@@ -1128,7 +1145,11 @@ public class SlotManagerImplTest extends TestLogger {
                         .setRedundantTaskManagerNum(0)
                         .buildAndStartWithDirectExec(resourceManagerId, 
resourceActions)) {
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
initialSlotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection,
+                    initialSlotReport,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             assertEquals(1, slotManager.getNumberRegisteredSlots());
 
@@ -1167,7 +1188,11 @@ public class SlotManagerImplTest extends TestLogger {
             final SlotStatus initialSlotStatus = new SlotStatus(slotId, 
ResourceProfile.ANY);
             final SlotReport initialSlotReport = new 
SlotReport(initialSlotStatus);
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
initialSlotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection,
+                    initialSlotReport,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1)));
 
@@ -1246,7 +1271,8 @@ public class SlotManagerImplTest extends TestLogger {
                     new CompletableFuture<>();
             responseQueue.offer(firstManualSlotRequestResponse);
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, 
ResourceManagerId>
                     firstRequest = requestSlotQueue.take();
@@ -1328,7 +1354,8 @@ public class SlotManagerImplTest extends TestLogger {
                     new CompletableFuture<>();
             responseQueue.offer(firstManualSlotRequestResponse);
 
-            slotManager.registerTaskManager(taskExecutionConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, 
ResourceManagerId>
                     firstRequest = requestSlotQueue.take();
@@ -1402,7 +1429,11 @@ public class SlotManagerImplTest extends TestLogger {
 
             // register the task-manager-1 to the slot manager, this will 
trigger the slot
             // allocation for job1.
-            slotManager.registerTaskManager(taskExecutionConnection1, 
slotReport1);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection1,
+                    slotReport1,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             // register slot request for job2.
             JobID jobId2 = new JobID();
@@ -1427,7 +1458,11 @@ public class SlotManagerImplTest extends TestLogger {
 
             // register the task-manager-2 to the slot manager, this will 
trigger the slot
             // allocation for job2 and job3.
-            slotManager.registerTaskManager(taskExecutionConnection2, 
slotReport2);
+            slotManager.registerTaskManager(
+                    taskExecutionConnection2,
+                    slotReport2,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             // validate for job1.
             slotManager.unregisterTaskManager(
@@ -1628,7 +1663,8 @@ public class SlotManagerImplTest extends TestLogger {
                             resourceProfile,
                             SlotManagerImplTest::createEmptySlotStatus);
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots 
- 1));
             assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
@@ -1676,7 +1712,8 @@ public class SlotManagerImplTest extends TestLogger {
                             offeredSlotProfile,
                             SlotManagerImplTest::createEmptySlotStatus);
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(slotManager.getNumberRegisteredSlots(), 
is(numberOfferedSlots));
             assertThat(slotManager.getNumberPendingTaskManagerSlots(), 
is(numberSlots));
@@ -1701,7 +1738,8 @@ public class SlotManagerImplTest extends TestLogger {
                     new SlotStatus(slotId, ResourceProfile.ANY, jobId, new 
AllocationID());
             final SlotReport slotReport = new SlotReport(slotStatus);
 
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(slotManager.getNumberRegisteredSlots(), is(1));
             assertThat(slotManager.getNumberPendingTaskManagerSlots(), 
is(numberSlots));
@@ -1730,7 +1768,8 @@ public class SlotManagerImplTest extends TestLogger {
             SlotReport slotReport =
                     createSingleAllocatedSlotReport(
                             taskExecutorConnection.getResourceID(), new 
JobID());
-            slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+            slotManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             
slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID(), 
failureCause);
 
             assertThat(allocationFailureCause.get(), 
FlinkMatchers.containsCause(failureCause));
@@ -1869,8 +1908,10 @@ public class SlotManagerImplTest extends TestLogger {
                         .setMaxSlotNum(maxSlotNum)
                         .setRedundantTaskManagerNum(0)
                         .buildAndStartWithDirectExec(resourceManagerId, 
resourceManagerActions)) {
-            slotManager.registerTaskManager(taskManagerConnection1, 
slotReport1);
-            slotManager.registerTaskManager(taskManagerConnection2, 
slotReport2);
+            slotManager.registerTaskManager(
+                    taskManagerConnection1, slotReport1, ResourceProfile.ANY, 
ResourceProfile.ANY);
+            slotManager.registerTaskManager(
+                    taskManagerConnection2, slotReport2, ResourceProfile.ANY, 
ResourceProfile.ANY);
 
             assertThat(
                     "The number registered slots does not equal the expected 
number.",
@@ -1901,7 +1942,11 @@ public class SlotManagerImplTest extends TestLogger {
                 createTaskExecutorConnection(taskExecutorGateway);
         final SlotReport firstSlotReport =
                 createSlotReport(firstTaskExecutorConnection.getResourceID(), 
2);
-        slotManager.registerTaskManager(firstTaskExecutorConnection, 
firstSlotReport);
+        slotManager.registerTaskManager(
+                firstTaskExecutorConnection,
+                firstSlotReport,
+                ResourceProfile.ANY,
+                ResourceProfile.ANY);
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 6156264..13d5401 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -127,7 +127,10 @@ public class SlotProtocolTest extends TestLogger {
             final SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
             // register slot at SlotManager
             slotManager.registerTaskManager(
-                    new TaskExecutorConnection(resourceID, 
taskExecutorGateway), slotReport);
+                    new TaskExecutorConnection(resourceID, 
taskExecutorGateway),
+                    slotReport,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             // 4) Slot becomes available and TaskExecutor gets a SlotRequest
             assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, 
jobID, allocationID))));
@@ -173,7 +176,10 @@ public class SlotProtocolTest extends TestLogger {
             final SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
             // register slot at SlotManager
             slotManager.registerTaskManager(
-                    new TaskExecutorConnection(resourceID, 
taskExecutorGateway), slotReport);
+                    new TaskExecutorConnection(resourceID, 
taskExecutorGateway),
+                    slotReport,
+                    ResourceProfile.ANY,
+                    ResourceProfile.ANY);
 
             final String targetAddress = "foobar";
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
index 7985804..d7c4881 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
@@ -228,7 +228,8 @@ public class TaskManagerCheckInSlotManagerTest extends 
TestLogger {
                         .setRedundantTaskManagerNum(redundantTaskManagerNum)
                         .buildAndStartWithDirectExec(resourceManagerId, 
resourceManagerActions)) {
 
-            slotManager.registerTaskManager(taskManagerConnection, slotReport);
+            slotManager.registerTaskManager(
+                    taskManagerConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
             assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
         }
     }
@@ -289,13 +290,23 @@ public class TaskManagerCheckInSlotManagerTest extends 
TestLogger {
                 new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
         mainThreadExecutor.execute(
-                () -> slotManager.registerTaskManager(taskManagerConnection, 
slotReport));
+                () ->
+                        slotManager.registerTaskManager(
+                                taskManagerConnection,
+                                slotReport,
+                                ResourceProfile.ANY,
+                                ResourceProfile.ANY));
     }
 
     private SlotManagerImpl createAndStartSlotManagerWithTM() {
         SlotManagerImpl slotManager = createAndStartSlotManager(0, 1);
         mainThreadExecutor.execute(
-                () -> slotManager.registerTaskManager(taskManagerConnection, 
slotReport));
+                () ->
+                        slotManager.registerTaskManager(
+                                taskManagerConnection,
+                                slotReport,
+                                ResourceProfile.ANY,
+                                ResourceProfile.ANY));
         return slotManager;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index 5982aab..801965e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -121,7 +121,10 @@ public class TestingSlotManager implements SlotManager {
 
     @Override
     public boolean registerTaskManager(
-            TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport) {
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
         return true;
     }
 

Reply via email to