This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eab0a1faf5e7ecf8da641880b8913d49ac19da2b
Author: Lijie Wang <[email protected]>
AuthorDate: Tue Jun 28 22:55:43 2022 +0800

    [FLINK-28142][runtime] Enrich TaskExecutorRegistration with node information
    
    This closes #20056.
---
 .../flink/runtime/resourcemanager/ResourceManager.java  |  3 ++-
 .../resourcemanager/TaskExecutorRegistration.java       | 17 ++++++++++++++++-
 .../registration/WorkerRegistration.java                | 10 +++++++++-
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java |  3 ++-
 .../ResourceManagerPartitionLifecycleTest.java          |  3 ++-
 .../ResourceManagerTaskExecutorTest.java                |  9 ++++++---
 .../runtime/resourcemanager/ResourceManagerTest.java    |  3 ++-
 .../active/ActiveResourceManagerTest.java               |  3 ++-
 .../TaskExecutorToResourceManagerConnectionTest.java    |  7 ++++++-
 9 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c94d6f60a03..c845d997e0d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -942,7 +942,8 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                             taskExecutorRegistration.getHardwareDescription(),
                             taskExecutorRegistration.getMemoryConfiguration(),
                             taskExecutorRegistration.getTotalResourceProfile(),
-                            
taskExecutorRegistration.getDefaultSlotResourceProfile());
+                            
taskExecutorRegistration.getDefaultSlotResourceProfile(),
+                            taskExecutorRegistration.getNodeId());
 
             log.info(
                     "Registering TaskManager with ResourceID {} ({}) at 
ResourceManager",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index 771084f5ee0..b3b9c51a3f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -55,6 +56,14 @@ public class TaskExecutorRegistration implements 
Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /**
+     * ID of the node where the TaskManager is located on. In Yarn and Native 
Kubernetes mode, this
+     * value will be set by resource manager when launch this TaskManager(via 
the config option
+     * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other 
modes, this value will be
+     * the external address of the TaskManager.
+     */
+    private final String nodeId;
+
     public TaskExecutorRegistration(
             final String taskExecutorAddress,
             final ResourceID resourceId,
@@ -63,7 +72,8 @@ public class TaskExecutorRegistration implements Serializable 
{
             final HardwareDescription hardwareDescription,
             final TaskExecutorMemoryConfiguration memoryConfiguration,
             final ResourceProfile defaultSlotResourceProfile,
-            final ResourceProfile totalResourceProfile) {
+            final ResourceProfile totalResourceProfile,
+            final String nodeId) {
         this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
         this.resourceId = checkNotNull(resourceId);
         this.dataPort = dataPort;
@@ -72,6 +82,7 @@ public class TaskExecutorRegistration implements Serializable 
{
         this.memoryConfiguration = checkNotNull(memoryConfiguration);
         this.defaultSlotResourceProfile = 
checkNotNull(defaultSlotResourceProfile);
         this.totalResourceProfile = checkNotNull(totalResourceProfile);
+        this.nodeId = checkNotNull(nodeId);
     }
 
     public String getTaskExecutorAddress() {
@@ -105,4 +116,8 @@ public class TaskExecutorRegistration implements 
Serializable {
     public ResourceProfile getTotalResourceProfile() {
         return totalResourceProfile;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index f5e77b4a0f5..79f402e4cd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -43,6 +43,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
 
     private final ResourceProfile defaultSlotResourceProfile;
 
+    private final String nodeId;
+
     public WorkerRegistration(
             TaskExecutorGateway taskExecutorGateway,
             WorkerType worker,
@@ -51,7 +53,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
             HardwareDescription hardwareDescription,
             TaskExecutorMemoryConfiguration memoryConfiguration,
             ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile) {
+            ResourceProfile defaultSlotResourceProfile,
+            String nodeId) {
 
         super(worker.getResourceID(), taskExecutorGateway);
 
@@ -62,6 +65,7 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
         this.memoryConfiguration = 
Preconditions.checkNotNull(memoryConfiguration);
         this.totalResourceProfile = 
Preconditions.checkNotNull(totalResourceProfile);
         this.defaultSlotResourceProfile = 
Preconditions.checkNotNull(defaultSlotResourceProfile);
+        this.nodeId = Preconditions.checkNotNull(nodeId);
     }
 
     public WorkerType getWorker() {
@@ -91,4 +95,8 @@ public class WorkerRegistration<WorkerType extends 
ResourceIDRetrievable>
     public ResourceProfile getTotalResourceProfile() {
         return totalResourceProfile;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 55bc617f5b5..a7df344756c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1350,7 +1350,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                         hardwareDescription,
                         memoryConfiguration,
                         
taskManagerConfiguration.getDefaultSlotResourceProfile(),
-                        taskManagerConfiguration.getTotalResourceProfile());
+                        taskManagerConfiguration.getTotalResourceProfile(),
+                        unresolvedTaskManagerLocation.getNodeId());
 
         resourceManagerConnection =
                 new TaskExecutorToResourceManagerConnection(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index c527fdeca31..d66dfb85067 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -195,7 +195,8 @@ public class ResourceManagerPartitionLifecycleTest extends 
TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        taskExecutorAddress);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index f50ecc366f1..9f14d33b1cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -238,7 +238,8 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                             new TaskExecutorMemoryConfiguration(
                                     1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                             DEFAULT_SLOT_PROFILE,
-                            DEFAULT_SLOT_PROFILE);
+                            DEFAULT_SLOT_PROFILE,
+                            taskExecutorGateway.getAddress());
 
             CompletableFuture<RegistrationResponse> firstFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, 
fastTimeout);
@@ -307,7 +308,8 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
-                        DEFAULT_SLOT_PROFILE.multiply(numberSlots));
+                        DEFAULT_SLOT_PROFILE.multiply(numberSlots),
+                        taskExecutorGateway.getAddress());
         final RegistrationResponse registrationResponse =
                 rmGateway.registerTaskExecutor(taskExecutorRegistration, 
TIMEOUT).get();
         assertThat(registrationResponse, 
instanceOf(TaskExecutorRegistrationSuccess.class));
@@ -384,7 +386,8 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
-                        DEFAULT_SLOT_PROFILE),
+                        DEFAULT_SLOT_PROFILE,
+                        taskExecutorAddress),
                 TIMEOUT);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 0564bbc373d..d8a93a948cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -233,7 +233,8 @@ public class ResourceManagerTest extends TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        taskExecutorAddress);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 6129f9a34b2..08ba16d314c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -949,7 +949,8 @@ public class ActiveResourceManagerTest extends TestLogger {
                             new HardwareDescription(1, 2L, 3L, 4L),
                             TESTING_CONFIG,
                             ResourceProfile.ZERO,
-                            ResourceProfile.ZERO);
+                            ResourceProfile.ZERO,
+                            resourceID.toString());
 
             return resourceManager
                     .getSelfGateway(ResourceManagerGateway.class)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
index 6e4616f7a09..7f153a175e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
@@ -65,6 +65,8 @@ class TaskExecutorToResourceManagerConnectionTest {
 
     private static final int TASK_MANAGER_JMX_PORT = 23456;
 
+    private static final String TASK_MANAGER_NODE_ID = "local";
+
     private static final HardwareDescription TASK_MANAGER_HARDWARE_DESCRIPTION 
=
             HardwareDescription.extractFromSystem(Long.MAX_VALUE);
 
@@ -93,6 +95,7 @@ class TaskExecutorToResourceManagerConnectionTest {
                             taskExecutorRegistration.getHardwareDescription();
                     final TaskExecutorMemoryConfiguration 
actualMemoryConfiguration =
                             taskExecutorRegistration.getMemoryConfiguration();
+                    final String nodeID = taskExecutorRegistration.getNodeId();
 
                     assertThat(actualAddress).isEqualTo(TASK_MANAGER_ADDRESS);
                     
assertThat(actualResourceId).isEqualTo(TASK_MANAGER_RESOURCE_ID);
@@ -101,6 +104,7 @@ class TaskExecutorToResourceManagerConnectionTest {
                             .isEqualTo(TASK_MANAGER_HARDWARE_DESCRIPTION);
                     assertThat(actualMemoryConfiguration)
                             .isEqualTo(TASK_MANAGER_MEMORY_CONFIGURATION);
+                    assertThat(nodeID).isEqualTo(TASK_MANAGER_NODE_ID);
 
                     return 
CompletableFuture.completedFuture(successfulRegistration());
                 });
@@ -135,7 +139,8 @@ class TaskExecutorToResourceManagerConnectionTest {
                         TASK_MANAGER_HARDWARE_DESCRIPTION,
                         TASK_MANAGER_MEMORY_CONFIGURATION,
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        TASK_MANAGER_NODE_ID);
         return new TaskExecutorToResourceManagerConnection(
                 LOGGER,
                 rpcService,

Reply via email to