This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b0b6669c5f9 [FLINK-37813][runtime] Pass numberSlots via 
WorkerRegistration instead of SlotReport (#27100)
b0b6669c5f9 is described below

commit b0b6669c5f989a53e3aa8763a7b3f91e7c35b7b7
Author: Weihua Hu <[email protected]>
AuthorDate: Wed Dec 3 11:10:18 2025 +0800

    [FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of 
SlotReport (#27100)
    
    * [FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of 
SlotReport
    
    SlotReport should only report the current slot usage, while numberSlots is
    an attribute of the TaskManager and should not be included in the 
SlotReport.
---
 .../runtime/resourcemanager/ResourceManager.java   |  5 +-
 .../resourcemanager/TaskExecutorRegistration.java  | 11 ++++-
 .../registration/WorkerRegistration.java           | 10 +++-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  3 +-
 .../ResourceManagerPartitionLifecycleTest.java     |  3 +-
 .../ResourceManagerTaskExecutorTest.java           |  9 ++--
 .../resourcemanager/ResourceManagerTest.java       |  6 ++-
 .../active/ActiveResourceManagerTest.java          |  3 +-
 .../taskexecutor/TaskExecutorRecoveryTest.java     | 54 ++++++++++++++++++++--
 ...askExecutorToResourceManagerConnectionTest.java |  3 +-
 10 files changed, 89 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8f9a7cf366b..b3f9404e3ee 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -525,7 +525,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                 WorkerResourceSpec workerResourceSpec =
                         WorkerResourceSpec.fromTotalResourceProfile(
                                 
workerTypeWorkerRegistration.getTotalResourceProfile(),
-                                slotReport.getNumSlotStatus());
+                                workerTypeWorkerRegistration.getNumberSlots());
                 onWorkerRegistered(workerTypeWorkerRegistration.getWorker(), 
workerResourceSpec);
             } else if (registrationResult == 
SlotManager.RegistrationResult.REJECTED) {
                 closeTaskManagerConnection(
@@ -1083,7 +1083,8 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                             taskExecutorRegistration.getMemoryConfiguration(),
                             taskExecutorRegistration.getTotalResourceProfile(),
                             
taskExecutorRegistration.getDefaultSlotResourceProfile(),
-                            taskExecutorRegistration.getNodeId());
+                            taskExecutorRegistration.getNodeId(),
+                            taskExecutorRegistration.getNumberSlots());
 
             log.info(
                     "Registering TaskManager with ResourceID {} ({}) at 
ResourceManager",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index b3b9c51a3f5..6f8530b6a13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable 
{
      */
     private final String nodeId;
 
+    /** Number of slots in static slot allocation. */
+    private final int numberSlots;
+
     public TaskExecutorRegistration(
             final String taskExecutorAddress,
             final ResourceID resourceId,
@@ -73,7 +76,8 @@ public class TaskExecutorRegistration implements Serializable 
{
             final TaskExecutorMemoryConfiguration memoryConfiguration,
             final ResourceProfile defaultSlotResourceProfile,
             final ResourceProfile totalResourceProfile,
-            final String nodeId) {
+            final String nodeId,
+            final int numberSlots) {
         this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
         this.resourceId = checkNotNull(resourceId);
         this.dataPort = dataPort;
@@ -83,6 +87,7 @@ public class TaskExecutorRegistration implements Serializable 
{
         this.defaultSlotResourceProfile = 
checkNotNull(defaultSlotResourceProfile);
         this.totalResourceProfile = checkNotNull(totalResourceProfile);
         this.nodeId = checkNotNull(nodeId);
+        this.numberSlots = numberSlots;
     }
 
     public String getTaskExecutorAddress() {
@@ -120,4 +125,8 @@ public class TaskExecutorRegistration implements 
Serializable {
     public String getNodeId() {
         return nodeId;
     }
+
+    public int getNumberSlots() {
+        return numberSlots;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 79f402e4cd1..660fb33103d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -45,6 +45,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
 
     private final String nodeId;
 
+    private final int numberSlots;
+
     public WorkerRegistration(
             TaskExecutorGateway taskExecutorGateway,
             WorkerType worker,
@@ -54,7 +56,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
             TaskExecutorMemoryConfiguration memoryConfiguration,
             ResourceProfile totalResourceProfile,
             ResourceProfile defaultSlotResourceProfile,
-            String nodeId) {
+            String nodeId,
+            int numberSlots) {
 
         super(worker.getResourceID(), taskExecutorGateway);
 
@@ -66,6 +69,7 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
         this.totalResourceProfile = 
Preconditions.checkNotNull(totalResourceProfile);
         this.defaultSlotResourceProfile = 
Preconditions.checkNotNull(defaultSlotResourceProfile);
         this.nodeId = Preconditions.checkNotNull(nodeId);
+        this.numberSlots = numberSlots;
     }
 
     public WorkerType getWorker() {
@@ -99,4 +103,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
     public String getNodeId() {
         return nodeId;
     }
+
+    public int getNumberSlots() {
+        return numberSlots;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 17b2adea2e8..88f24d7df0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1568,7 +1568,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                         memoryConfiguration,
                         
taskManagerConfiguration.getDefaultSlotResourceProfile(),
                         taskManagerConfiguration.getTotalResourceProfile(),
-                        unresolvedTaskManagerLocation.getNodeId());
+                        unresolvedTaskManagerLocation.getNodeId(),
+                        taskManagerConfiguration.getNumberSlots());
 
         resourceManagerConnection =
                 new TaskExecutorToResourceManagerConnection(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 44ca1cded16..e581a037db0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -187,7 +187,8 @@ class ResourceManagerPartitionLifecycleTest {
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
                         ResourceProfile.ZERO,
-                        taskExecutorAddress);
+                        taskExecutorAddress,
+                        1);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 9e4add90f96..761f94b08bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -222,7 +222,8 @@ class ResourceManagerTaskExecutorTest {
                                     1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                             DEFAULT_SLOT_PROFILE,
                             DEFAULT_SLOT_PROFILE,
-                            taskExecutorGateway.getAddress());
+                            taskExecutorGateway.getAddress(),
+                            1);
 
             CompletableFuture<RegistrationResponse> firstFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, 
fastTimeout);
@@ -287,7 +288,8 @@ class ResourceManagerTaskExecutorTest {
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
                         DEFAULT_SLOT_PROFILE.multiply(numberSlots),
-                        taskExecutorGateway.getAddress());
+                        taskExecutorGateway.getAddress(),
+                        numberSlots);
         final RegistrationResponse registrationResponse =
                 rmGateway.registerTaskExecutor(taskExecutorRegistration, 
TIMEOUT).get();
         
assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
@@ -364,7 +366,8 @@ class ResourceManagerTaskExecutorTest {
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
                         DEFAULT_SLOT_PROFILE,
-                        taskExecutorAddress),
+                        taskExecutorAddress,
+                        1),
                 TIMEOUT);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 976f8b65028..935096c6e63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -254,7 +254,8 @@ class ResourceManagerTest {
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
                         ResourceProfile.ZERO,
-                        taskExecutorAddress);
+                        taskExecutorAddress,
+                        1);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
@@ -767,7 +768,8 @@ class ResourceManagerTest {
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.fromResources(1, 1024),
                         ResourceProfile.fromResources(1, 
1024).multiply(slotCount),
-                        taskExecutorGateway.getAddress());
+                        taskExecutorGateway.getAddress(),
+                        slotCount);
         RegistrationResponse registrationResult =
                 resourceManagerGateway
                         .registerTaskExecutor(taskExecutorRegistration, 
TestingUtils.TIMEOUT)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 639684b30d9..e39de9da1ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -1245,7 +1245,8 @@ class ActiveResourceManagerTest {
                             TESTING_CONFIG,
                             ResourceProfile.ZERO,
                             ResourceProfile.ZERO,
-                            resourceID.toString());
+                            resourceID.toString(),
+                            1);
 
             return resourceManager
                     .getSelfGateway(ResourceManagerGateway.class)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
index 00ff1df1326..25f2970c365 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
@@ -26,12 +26,15 @@ import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
@@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
             new EachCallbackWrapper<>(rpcServiceExtension);
 
     @Test
-    void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File 
tempDir)
-            throws Exception {
+    void 
testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest(
+            @TempDir File tempDir) throws Exception {
+        testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false);
+    }
+
+    @Test
+    void 
testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest(
+            @TempDir File tempDir) throws Exception {
+        testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true);
+    }
+
+    private void testRecoveredTaskExecutorWillRestoreAllocationState(
+            File tempDir, boolean useDynamicRequest) throws Exception {
         final ResourceID resourceId = ResourceID.generate();
 
         final Configuration configuration = new Configuration();
@@ -82,6 +96,20 @@ class TaskExecutorRecoveryTest {
                     return 
CompletableFuture.completedFuture(Acknowledge.get());
                 });
 
+        final ArrayBlockingQueue<TaskExecutorRegistration> 
taskExecutorRegistrations =
+                new ArrayBlockingQueue<>(2);
+
+        testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+                taskExecutorRegistration -> {
+                    taskExecutorRegistrations.offer(taskExecutorRegistration);
+                    return CompletableFuture.completedFuture(
+                            new TaskExecutorRegistrationSuccess(
+                                    new InstanceID(),
+                                    taskExecutorRegistration.getResourceId(),
+                                    new ClusterInformation("localhost", 1234),
+                                    null));
+                });
+
         final TestingRpcService rpcService = 
rpcServiceExtension.getTestingRpcService();
         rpcService.registerGateway(
                 testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
@@ -118,8 +146,14 @@ class TaskExecutorRecoveryTest {
 
         assertThat(slotReport.getNumSlotStatus(), is(2));
 
+        final TaskExecutorRegistration taskExecutorRegistration = 
taskExecutorRegistrations.take();
+        assertThat(taskExecutorRegistration.getNumberSlots(), is(2));
+
         final SlotStatus slotStatus = slotReport.iterator().next();
-        final SlotID allocatedSlotID = slotStatus.getSlotID();
+        final SlotID allocatedSlotID =
+                useDynamicRequest
+                        ? 
SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID())
+                        : slotStatus.getSlotID();
 
         final AllocationID allocationId = new AllocationID();
         taskExecutorGateway
@@ -160,9 +194,15 @@ class TaskExecutorRecoveryTest {
         recoveredTaskExecutor.start();
 
         final TaskExecutorSlotReport recoveredSlotReport = queue.take();
-
+        final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2;
+        assertThat(
+                recoveredSlotReport.getSlotReport().getNumSlotStatus(), 
is(expectedNumberOfSlots));
         for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
-            if (status.getSlotID().equals(allocatedSlotID)) {
+            boolean isAllocatedSlot =
+                    useDynamicRequest
+                            ? status.getSlotID().getSlotNumber() == 2
+                            : status.getSlotID().equals(allocatedSlotID);
+            if (isAllocatedSlot) {
                 assertThat(status.getJobID(), is(jobId));
                 assertThat(status.getAllocationID(), is(allocationId));
             } else {
@@ -170,6 +210,10 @@ class TaskExecutorRecoveryTest {
             }
         }
 
+        final TaskExecutorRegistration recoveredTaskExecutorRegistration =
+                taskExecutorRegistrations.take();
+        assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2));
+
         final Collection<SlotOffer> take = offeredSlots.take();
 
         assertThat(take, hasSize(1));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
index 6d19c1d6c26..4fd0545328f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
@@ -140,7 +140,8 @@ class TaskExecutorToResourceManagerConnectionTest {
                         TASK_MANAGER_MEMORY_CONFIGURATION,
                         ResourceProfile.ZERO,
                         ResourceProfile.ZERO,
-                        TASK_MANAGER_NODE_ID);
+                        TASK_MANAGER_NODE_ID,
+                        1);
         return new TaskExecutorToResourceManagerConnection(
                 LOGGER,
                 rpcService,

Reply via email to