This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7265bfc43530a2bec1299057b704e2b2581716ce Author: Yangze Guo <[email protected]> AuthorDate: Wed Nov 25 16:27:18 2020 +0800 [FLINK-20836][runtime] Add total resource profile and default slot resource profile to WorkerRegistration --- .../runtime/resourcemanager/ResourceManager.java | 4 +++- .../registration/WorkerRegistration.java | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index b818889..a2b4977 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -946,7 +946,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> taskExecutorRegistration.getDataPort(), taskExecutorRegistration.getJmxPort(), taskExecutorRegistration.getHardwareDescription(), - taskExecutorRegistration.getMemoryConfiguration()); + taskExecutorRegistration.getMemoryConfiguration(), + taskExecutorRegistration.getTotalResourceProfile(), + taskExecutorRegistration.getDefaultSlotResourceProfile()); log.info( "Registering TaskManager with ResourceID {} ({}) at ResourceManager", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java index 06b0d8d..f5e77b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.registration; import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; @@ -38,13 +39,19 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> private final TaskExecutorMemoryConfiguration memoryConfiguration; + private final ResourceProfile totalResourceProfile; + + private final ResourceProfile defaultSlotResourceProfile; + public WorkerRegistration( TaskExecutorGateway taskExecutorGateway, WorkerType worker, int dataPort, int jmxPort, HardwareDescription hardwareDescription, - TaskExecutorMemoryConfiguration memoryConfiguration) { + TaskExecutorMemoryConfiguration memoryConfiguration, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { super(worker.getResourceID(), taskExecutorGateway); @@ -53,6 +60,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> this.jmxPort = jmxPort; this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription); this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration); + this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile); + this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile); } public WorkerType getWorker() { @@ -74,4 +83,12 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> public TaskExecutorMemoryConfiguration getMemoryConfiguration() { return memoryConfiguration; } + + public ResourceProfile getDefaultSlotResourceProfile() { + return defaultSlotResourceProfile; + } + + public ResourceProfile getTotalResourceProfile() { + return totalResourceProfile; + } }
