This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0c4db2dd577 [FLINK-31510][yarn] Replace deprecated getMemory by
getMemorySize
0c4db2dd577 is described below
commit 0c4db2dd577fdfca1a12c115948bf67931d0bcde
Author: slfan1989 <louj1988@@>
AuthorDate: Sun Apr 9 08:54:13 2023 +0800
[FLINK-31510][yarn] Replace deprecated getMemory by getMemorySize
This closes #22207
---
...rocessSpecContainerResourcePriorityAdapter.java | 2 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 34 +++++++++++-----------
...ssSpecContainerResourcePriorityAdapterTest.java | 4 +--
3 files changed, 20 insertions(+), 20 deletions(-)
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java
index 1fb1bc2b421..bfe4da71e45 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java
@@ -123,7 +123,7 @@ public class
TaskExecutorProcessSpecContainerResourcePriorityAdapter {
taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(),
taskExecutorProcessSpec.getCpuCores().getValue().intValue());
- if (resource.getMemory() > maxContainerResource.getMemory()
+ if (resource.getMemorySize() > maxContainerResource.getMemorySize()
|| resource.getVirtualCores() >
maxContainerResource.getVirtualCores()) {
LOG.warn(
"Requested container resource ({}) exceeds the max
limitation of the Yarn cluster ({}). Will not allocate resource.",
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 965c608e38d..c2d830f9c3b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -672,22 +672,22 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
final String note =
"Please check the 'yarn.scheduler.maximum-allocation-mb' and
the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
+ if (jobManagerMemoryMb > maximumResourceCapability.getMemorySize()) {
throw new YarnDeploymentException(
"The cluster does not have the requested resources for the
JobManager available!\n"
+ "Maximum Memory: "
- + maximumResourceCapability.getMemory()
+ + maximumResourceCapability.getMemorySize()
+ "MB Requested: "
+ jobManagerMemoryMb
+ "MB. "
+ note);
}
- if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
+ if (taskManagerMemoryMb > maximumResourceCapability.getMemorySize()) {
throw new YarnDeploymentException(
"The cluster does not have the requested resources for the
TaskManagers available!\n"
+ "Maximum Memory: "
- + maximumResourceCapability.getMemory()
+ + maximumResourceCapability.getMemorySize()
+ " Requested: "
+ taskManagerMemoryMb
+ "MB. "
@@ -1196,7 +1196,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(clusterSpecification.getMasterMemoryMB());
+ capability.setMemorySize(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
@@ -1390,12 +1390,12 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
}
private static class ClusterResourceDescription {
- public final int totalFreeMemory;
- public final int containerLimit;
- public final int[] nodeManagersFree;
+ public final long totalFreeMemory;
+ public final long containerLimit;
+ public final long[] nodeManagersFree;
public ClusterResourceDescription(
- int totalFreeMemory, int containerLimit, int[]
nodeManagersFree) {
+ long totalFreeMemory, long containerLimit, long[]
nodeManagersFree) {
this.totalFreeMemory = totalFreeMemory;
this.containerLimit = containerLimit;
this.nodeManagersFree = nodeManagersFree;
@@ -1407,14 +1407,14 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
int totalFreeMemory = 0;
- int containerLimit = 0;
- int[] nodeManagersFree = new int[nodes.size()];
+ long containerLimit = 0;
+ long[] nodeManagersFree = new long[nodes.size()];
for (int i = 0; i < nodes.size(); i++) {
NodeReport rep = nodes.get(i);
- int free =
- rep.getCapability().getMemory()
- - (rep.getUsed() != null ?
rep.getUsed().getMemory() : 0);
+ long free =
+ rep.getCapability().getMemorySize()
+ - (rep.getUsed() != null ?
rep.getUsed().getMemorySize() : 0);
nodeManagersFree[i] = free;
totalFreeMemory += free;
if (free > containerLimit) {
@@ -1438,14 +1438,14 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
final String format = "|%-16s |%-16s %n";
ps.printf("|Property |Value %n");
ps.println("+---------------------------------------+");
- int totalMemory = 0;
+ long totalMemory = 0;
int totalCores = 0;
for (NodeReport rep : nodes) {
final Resource res = rep.getCapability();
- totalMemory += res.getMemory();
+ totalMemory += res.getMemorySize();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
- ps.format(format, "Memory", getDisplayMemory(res.getMemory()));
+ ps.format(format, "Memory",
getDisplayMemory(res.getMemorySize()));
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
index afcaf2ddf7e..1832f160466 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java
@@ -140,7 +140,7 @@ class
TaskExecutorProcessSpecContainerResourcePriorityAdapterTest {
void testGetResourceFromSpec() {
final TaskExecutorProcessSpecContainerResourcePriorityAdapter adapter
= getAdapter();
final Resource resource = getResource(adapter,
TASK_EXECUTOR_PROCESS_SPEC_1);
- assertThat(resource.getMemory())
+ assertThat(resource.getMemorySize())
.isEqualTo(TASK_EXECUTOR_PROCESS_SPEC_1.getTotalProcessMemorySize().getMebiBytes());
assertThat(resource.getVirtualCores())
.isEqualTo(TASK_EXECUTOR_PROCESS_SPEC_1.getCpuCores().getValue().intValue());
@@ -253,7 +253,7 @@ class
TaskExecutorProcessSpecContainerResourcePriorityAdapterTest {
getAdapterWithExternalResources(String resourceName, String
configKey) {
final Resource maxResource =
Resource.newInstance(
- MAX_CONTAINER_RESOURCE.getMemory(),
+ MAX_CONTAINER_RESOURCE.getMemorySize(),
MAX_CONTAINER_RESOURCE.getVirtualCores());
ResourceInformationReflector.INSTANCE.setResourceInformation(
maxResource,