This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 753da61bb36 [FLINK-31230][yarn] Improve YarnClusterDescriptor memory
unit display.
753da61bb36 is described below
commit 753da61bb366df35df4bc2c2ebbbe7f75e8237d3
Author: slfan1989 <louj1988@@>
AuthorDate: Wed Mar 22 19:33:34 2023 +0800
[FLINK-31230][yarn] Improve YarnClusterDescriptor memory unit display.
---
.../apache/flink/yarn/YARNSessionFIFOITCase.java | 2 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 28 ++++++++++++++--------
2 files changed, 19 insertions(+), 11 deletions(-)
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index a57b9b75bc3..8c19fd17e35 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -244,7 +244,7 @@ class YARNSessionFIFOITCase extends YarnTestBase {
log.info("Starting testQueryCluster()");
runWithArgs(
new String[] {"-q"},
- "Summary: totalMemory 8192 totalCores 1332",
+ "Summary: totalMemory 8.000gb (8589934592 bytes)
totalCores 1332",
null,
RunTypes.YARN_SESSION,
0); // we have 666*2 cores.
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 95fe09f9c06..8231fd6fa59 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -38,6 +38,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
@@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -844,7 +846,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
final ApplicationId appId = appContext.getApplicationId();
- // ------------------ Add Zookeeper namespace to local
flinkConfiguration ------
+ // ------------------ Add Zookeeper namespace to local
flinkConfiguraton ------
setHAClusterIdIfNotSet(configuration, appId);
if
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
@@ -1229,7 +1231,6 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
- long lastLogTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
@@ -1265,11 +1266,9 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " +
appState);
}
- if (System.currentTimeMillis() - lastLogTime > 60000) {
- lastLogTime = System.currentTimeMillis();
+ if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
- "Deployment took more than {} seconds. Please
check if the requested resources are available in the YARN cluster",
- (lastLogTime - startTime) / 1000);
+ "Deployment took more than 60 seconds. Please
check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
@@ -1443,13 +1442,17 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
totalMemory += res.getMemory();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
- ps.format(format, "Memory", res.getMemory() + " MB");
+ ps.format(format, "Memory", getDisplayMemory(res.getMemory()));
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
ps.println("+---------------------------------------+");
}
- ps.println("Summary: totalMemory " + totalMemory + " totalCores "
+ totalCores);
+ ps.println(
+ "Summary: totalMemory "
+ + getDisplayMemory(totalMemory)
+ + " totalCores "
+ + totalCores);
List<QueueInfo> qInfo = yarnClient.getAllQueues();
for (QueueInfo q : qInfo) {
ps.println(
@@ -1875,7 +1878,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
- flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID,
appId.toString());
+ flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID,
ConverterUtils.toString(appId));
setHAClusterIdIfNotSet(flinkConfiguration, appId);
}
@@ -1883,7 +1886,8 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
private void setHAClusterIdIfNotSet(Configuration configuration,
ApplicationId appId) {
// set cluster-id to app id if not specified
if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
- configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID,
appId.toString());
+ configuration.set(
+ HighAvailabilityOptions.HA_CLUSTER_ID,
ConverterUtils.toString(appId));
}
}
@@ -1938,4 +1942,8 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
Utils.setupYarnClassPath(this.yarnConfiguration, env);
return env;
}
+
+ private String getDisplayMemory(long memoryMB) {
+ return MemorySize.ofMebiBytes(memoryMB).toHumanReadableString();
+ }
}