This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 753da61bb36 [FLINK-31230][yarn] Improve YarnClusterDescriptor memory 
unit display.
753da61bb36 is described below

commit 753da61bb366df35df4bc2c2ebbbe7f75e8237d3
Author: slfan1989 <louj1988@@>
AuthorDate: Wed Mar 22 19:33:34 2023 +0800

    [FLINK-31230][yarn] Improve YarnClusterDescriptor memory unit display.
---
 .../apache/flink/yarn/YARNSessionFIFOITCase.java   |  2 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 28 ++++++++++++++--------
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index a57b9b75bc3..8c19fd17e35 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -244,7 +244,7 @@ class YARNSessionFIFOITCase extends YarnTestBase {
                     log.info("Starting testQueryCluster()");
                     runWithArgs(
                             new String[] {"-q"},
-                            "Summary: totalMemory 8192 totalCores 1332",
+                            "Summary: totalMemory 8.000gb (8589934592 bytes) 
totalCores 1332",
                             null,
                             RunTypes.YARN_SESSION,
                             0); // we have 666*2 cores.
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 95fe09f9c06..8231fd6fa59 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -38,6 +38,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -844,7 +846,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
         final ApplicationId appId = appContext.getApplicationId();
 
-        // ------------------ Add Zookeeper namespace to local 
flinkConfiguration ------
+        // ------------------ Add Zookeeper namespace to local 
flinkConfiguraton ------
         setHAClusterIdIfNotSet(configuration, appId);
 
         if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
@@ -1229,7 +1231,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
         LOG.info("Waiting for the cluster to be allocated");
         final long startTime = System.currentTimeMillis();
-        long lastLogTime = System.currentTimeMillis();
         ApplicationReport report;
         YarnApplicationState lastAppState = YarnApplicationState.NEW;
         loop:
@@ -1265,11 +1266,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                     if (appState != lastAppState) {
                         LOG.info("Deploying cluster, current state " + 
appState);
                     }
-                    if (System.currentTimeMillis() - lastLogTime > 60000) {
-                        lastLogTime = System.currentTimeMillis();
+                    if (System.currentTimeMillis() - startTime > 60000) {
                         LOG.info(
-                                "Deployment took more than {} seconds. Please 
check if the requested resources are available in the YARN cluster",
-                                (lastLogTime - startTime) / 1000);
+                                "Deployment took more than 60 seconds. Please 
check if the requested resources are available in the YARN cluster");
                     }
             }
             lastAppState = appState;
@@ -1443,13 +1442,17 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                 totalMemory += res.getMemory();
                 totalCores += res.getVirtualCores();
                 ps.format(format, "NodeID", rep.getNodeId());
-                ps.format(format, "Memory", res.getMemory() + " MB");
+                ps.format(format, "Memory", getDisplayMemory(res.getMemory()));
                 ps.format(format, "vCores", res.getVirtualCores());
                 ps.format(format, "HealthReport", rep.getHealthReport());
                 ps.format(format, "Containers", rep.getNumContainers());
                 ps.println("+---------------------------------------+");
             }
-            ps.println("Summary: totalMemory " + totalMemory + " totalCores " 
+ totalCores);
+            ps.println(
+                    "Summary: totalMemory "
+                            + getDisplayMemory(totalMemory)
+                            + " totalCores "
+                            + totalCores);
             List<QueueInfo> qInfo = yarnClient.getAllQueues();
             for (QueueInfo q : qInfo) {
                 ps.println(
@@ -1875,7 +1878,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         flinkConfiguration.setString(RestOptions.ADDRESS, host);
         flinkConfiguration.setInteger(RestOptions.PORT, port);
 
-        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, 
appId.toString());
+        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(appId));
 
         setHAClusterIdIfNotSet(flinkConfiguration, appId);
     }
@@ -1883,7 +1886,8 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
     private void setHAClusterIdIfNotSet(Configuration configuration, 
ApplicationId appId) {
         // set cluster-id to app id if not specified
         if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
-            configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, 
appId.toString());
+            configuration.set(
+                    HighAvailabilityOptions.HA_CLUSTER_ID, 
ConverterUtils.toString(appId));
         }
     }
 
@@ -1938,4 +1942,8 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         Utils.setupYarnClassPath(this.yarnConfiguration, env);
         return env;
     }
+
+    private String getDisplayMemory(long memoryMB) {
+        return MemorySize.ofMebiBytes(memoryMB).toHumanReadableString();
+    }
 }

Reply via email to