This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eab0a1faf5e7ecf8da641880b8913d49ac19da2b Author: Lijie Wang <[email protected]> AuthorDate: Tue Jun 28 22:55:43 2022 +0800 [FLINK-28142][runtime] Enrich TaskExecutorRegistration with node information This closes #20056. --- .../flink/runtime/resourcemanager/ResourceManager.java | 3 ++- .../resourcemanager/TaskExecutorRegistration.java | 17 ++++++++++++++++- .../registration/WorkerRegistration.java | 10 +++++++++- .../apache/flink/runtime/taskexecutor/TaskExecutor.java | 3 ++- .../ResourceManagerPartitionLifecycleTest.java | 3 ++- .../ResourceManagerTaskExecutorTest.java | 9 ++++++--- .../runtime/resourcemanager/ResourceManagerTest.java | 3 ++- .../active/ActiveResourceManagerTest.java | 3 ++- .../TaskExecutorToResourceManagerConnectionTest.java | 7 ++++++- 9 files changed, 47 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index c94d6f60a03..c845d997e0d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -942,7 +942,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> taskExecutorRegistration.getHardwareDescription(), taskExecutorRegistration.getMemoryConfiguration(), taskExecutorRegistration.getTotalResourceProfile(), - taskExecutorRegistration.getDefaultSlotResourceProfile()); + taskExecutorRegistration.getDefaultSlotResourceProfile(), + taskExecutorRegistration.getNodeId()); log.info( "Registering TaskManager with ResourceID {} ({}) at ResourceManager", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java index 771084f5ee0..b3b9c51a3f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.HardwareDescription; @@ -55,6 +56,14 @@ public class TaskExecutorRegistration implements Serializable { /** The task executor total resource profile. */ private final ResourceProfile totalResourceProfile; + /** + * ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode, this + * value will be set by resource manager when launch this TaskManager(via the config option + * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other modes, this value will be + * the external address of the TaskManager. + */ + private final String nodeId; + public TaskExecutorRegistration( final String taskExecutorAddress, final ResourceID resourceId, @@ -63,7 +72,8 @@ public class TaskExecutorRegistration implements Serializable { final HardwareDescription hardwareDescription, final TaskExecutorMemoryConfiguration memoryConfiguration, final ResourceProfile defaultSlotResourceProfile, - final ResourceProfile totalResourceProfile) { + final ResourceProfile totalResourceProfile, + final String nodeId) { this.taskExecutorAddress = checkNotNull(taskExecutorAddress); this.resourceId = checkNotNull(resourceId); this.dataPort = dataPort; @@ -72,6 +82,7 @@ public class TaskExecutorRegistration implements Serializable { this.memoryConfiguration = checkNotNull(memoryConfiguration); this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile); this.totalResourceProfile = checkNotNull(totalResourceProfile); + this.nodeId = checkNotNull(nodeId); } public String getTaskExecutorAddress() { @@ -105,4 +116,8 @@ public class TaskExecutorRegistration implements Serializable { public ResourceProfile getTotalResourceProfile() { return totalResourceProfile; } + + public String getNodeId() { + return nodeId; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java index f5e77b4a0f5..79f402e4cd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java @@ -43,6 +43,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> private final ResourceProfile defaultSlotResourceProfile; + private final String nodeId; + public WorkerRegistration( TaskExecutorGateway taskExecutorGateway, WorkerType worker, @@ -51,7 +53,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> HardwareDescription hardwareDescription, TaskExecutorMemoryConfiguration memoryConfiguration, ResourceProfile totalResourceProfile, - ResourceProfile defaultSlotResourceProfile) { + ResourceProfile defaultSlotResourceProfile, + String nodeId) { super(worker.getResourceID(), taskExecutorGateway); @@ -62,6 +65,7 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration); this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile); this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile); + this.nodeId = Preconditions.checkNotNull(nodeId); } public WorkerType getWorker() { @@ -91,4 +95,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> public ResourceProfile getTotalResourceProfile() { return totalResourceProfile; } + + public String getNodeId() { + return nodeId; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 55bc617f5b5..a7df344756c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1350,7 +1350,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { hardwareDescription, memoryConfiguration, taskManagerConfiguration.getDefaultSlotResourceProfile(), - taskManagerConfiguration.getTotalResourceProfile()); + taskManagerConfiguration.getTotalResourceProfile(), + unresolvedTaskManagerLocation.getNodeId()); resourceManagerConnection = new TaskExecutorToResourceManagerConnection( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java index c527fdeca31..d66dfb85067 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java @@ -195,7 +195,8 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger { new TaskExecutorMemoryConfiguration( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, - ResourceProfile.ZERO); + ResourceProfile.ZERO, + taskExecutorAddress); final CompletableFuture<RegistrationResponse> registrationFuture = resourceManagerGateway.registerTaskExecutor( taskExecutorRegistration, TestingUtils.TIMEOUT); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index f50ecc366f1..9f14d33b1cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -238,7 +238,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { new TaskExecutorMemoryConfiguration( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, - DEFAULT_SLOT_PROFILE); + DEFAULT_SLOT_PROFILE, + taskExecutorGateway.getAddress()); CompletableFuture<RegistrationResponse> firstFuture = rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout); @@ -307,7 +308,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { new TaskExecutorMemoryConfiguration( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, - DEFAULT_SLOT_PROFILE.multiply(numberSlots)); + DEFAULT_SLOT_PROFILE.multiply(numberSlots), + taskExecutorGateway.getAddress()); final RegistrationResponse registrationResponse = rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get(); assertThat(registrationResponse, instanceOf(TaskExecutorRegistrationSuccess.class)); @@ -384,7 +386,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { new TaskExecutorMemoryConfiguration( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, - DEFAULT_SLOT_PROFILE), + DEFAULT_SLOT_PROFILE, + taskExecutorAddress), TIMEOUT); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 0564bbc373d..d8a93a948cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -233,7 +233,8 @@ public class ResourceManagerTest extends TestLogger { new TaskExecutorMemoryConfiguration( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, - ResourceProfile.ZERO); + ResourceProfile.ZERO, + taskExecutorAddress); final CompletableFuture<RegistrationResponse> registrationFuture = resourceManagerGateway.registerTaskExecutor( taskExecutorRegistration, TestingUtils.TIMEOUT); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java index 6129f9a34b2..08ba16d314c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java @@ -949,7 +949,8 @@ public class ActiveResourceManagerTest extends TestLogger { new HardwareDescription(1, 2L, 3L, 4L), TESTING_CONFIG, ResourceProfile.ZERO, - ResourceProfile.ZERO); + ResourceProfile.ZERO, + resourceID.toString()); return resourceManager .getSelfGateway(ResourceManagerGateway.class) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java index 6e4616f7a09..7f153a175e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java @@ -65,6 +65,8 @@ class TaskExecutorToResourceManagerConnectionTest { private static final int TASK_MANAGER_JMX_PORT = 23456; + private static final String TASK_MANAGER_NODE_ID = "local"; + private static final HardwareDescription TASK_MANAGER_HARDWARE_DESCRIPTION = HardwareDescription.extractFromSystem(Long.MAX_VALUE); @@ -93,6 +95,7 @@ class TaskExecutorToResourceManagerConnectionTest { taskExecutorRegistration.getHardwareDescription(); final TaskExecutorMemoryConfiguration actualMemoryConfiguration = taskExecutorRegistration.getMemoryConfiguration(); + final String nodeID = taskExecutorRegistration.getNodeId(); assertThat(actualAddress).isEqualTo(TASK_MANAGER_ADDRESS); assertThat(actualResourceId).isEqualTo(TASK_MANAGER_RESOURCE_ID); @@ -101,6 +104,7 @@ class TaskExecutorToResourceManagerConnectionTest { .isEqualTo(TASK_MANAGER_HARDWARE_DESCRIPTION); assertThat(actualMemoryConfiguration) .isEqualTo(TASK_MANAGER_MEMORY_CONFIGURATION); + assertThat(nodeID).isEqualTo(TASK_MANAGER_NODE_ID); return CompletableFuture.completedFuture(successfulRegistration()); }); @@ -135,7 +139,8 @@ class TaskExecutorToResourceManagerConnectionTest { TASK_MANAGER_HARDWARE_DESCRIPTION, TASK_MANAGER_MEMORY_CONFIGURATION, ResourceProfile.ZERO, - ResourceProfile.ZERO); + ResourceProfile.ZERO, + TASK_MANAGER_NODE_ID); return new TaskExecutorToResourceManagerConnection( LOGGER, rpcService,
