[ 
https://issues.apache.org/jira/browse/FLINK-5103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16652325#comment-16652325
 ] 

ASF GitHub Bot commented on FLINK-5103:
---------------------------------------

zentol closed pull request #2833: [FLINK-5103] [Metrics] TaskManager process 
virtual memory and physical memory used size gauge
URL: https://github.com/apache/flink/pull/2833
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index da8db02df80..32f595dd640 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -70,6 +70,7 @@ public String handleRequest(AccessExecution execAttempt, 
Map<String, String> par
                gen.writeNumberField("start-time", startTime);
                gen.writeNumberField("end-time", endTime);
                gen.writeNumberField("duration", duration);
+               gen.writeStringField("exception", 
execAttempt.getFailureCauseAsString());
 
                IOMetrics ioMetrics = execAttempt.getIOMetrics();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index a10dc3b888a..23e7d27704c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -21,6 +21,7 @@
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.util.Hardware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,6 +182,20 @@ public Long getValue() {
                                }
                        });
                }
+
+               MetricGroup process = metrics.addGroup("Process");
+               process.gauge("VmSize", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return Hardware.getSizeOfProcessVirtualMemory();
+                       }
+               });
+               process.gauge("VmRSS", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return 
Hardware.getSizeOfProcessPhysicalMemory();
+                       }
+               });
        }
 
        private static void instantiateThreadMetrics(MetricGroup metrics) {
@@ -245,5 +260,12 @@ public Long getValue() {
                                }
                        });
                }
+
+               metrics.gauge("Usage", new Gauge<Double>() {
+                       @Override
+                       public Double getValue() {
+                               return Hardware.getProcessCpuUsage();
+                       }
+               });
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
index 0b0e7ffb2f4..1ff42c3846d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java
@@ -26,6 +26,10 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -45,7 +49,52 @@
 
        private static final Pattern LINUX_MEMORY_REGEX = 
Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
 
+       private static final String LINUX_PROCESS_STATUS_PATH = 
"/proc/self/status";
+
+       private static final Pattern LINUX_PROCESS_VIRTUAL_MEMORY_REGEX = 
Pattern.compile("^VmSize:\\s*(\\d+)\\s+kB$");
+
+       private static final Pattern LINUX_PROCESS_PHYSICAL_MEMORY_REGEX = 
Pattern.compile("^VmRSS:\\s*(\\d+)\\s+kB$");
+
+       private static CpuStatServiceForLinux cpuStatServiceForLinux = null;
+
        // 
------------------------------------------------------------------------
+
+       /**
+        * start hardware specifics service
+        */
+       static {
+               // try the OS specific access paths
+               switch (OperatingSystem.getCurrentOperatingSystem()) {
+                       case LINUX:
+                               cpuStatServiceForLinux = new 
CpuStatServiceForLinux();
+                               break;
+
+                       case WINDOWS:
+                       case MAC_OS:
+                       case FREE_BSD:
+                               break;
+
+                       case UNKNOWN:
+                               LOG.error("Unknown operating system.");
+                               break;
+
+                       default:
+                               LOG.error("Unrecognized OS: " + 
OperatingSystem.getCurrentOperatingSystem());
+               }
+
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Shutdown hardware specifics service
+        */
+       public static void shutdown() {
+               if (cpuStatServiceForLinux != null) {
+                       cpuStatServiceForLinux.shutdown();
+                       cpuStatServiceForLinux = null;
+               }
+       }
        
        /**
         * Gets the number of CPU cores (hardware contexts) that the JVM has 
access to.
@@ -266,6 +315,325 @@ private static long getSizeOfPhysicalMemoryForWindows() {
                }
        }
 
+       /**
+        * Returns the size of the process virtual memory in bytes.
+        *
+        * @return the size of the process virtual memory in bytes or {@code 
-1}, if
+        *         the size could not be determined.
+        */
+       public static long getSizeOfProcessVirtualMemory() {
+               // try the OS specific access paths
+               switch (OperatingSystem.getCurrentOperatingSystem()) {
+                       case LINUX:
+                               return getSizeOfProcessVirtualMemoryForLinux();
+
+                       case WINDOWS:
+                       case MAC_OS:
+                       case FREE_BSD:
+                               LOG.warn("Unsupported access process virtual 
memory for OS:" + OperatingSystem.getCurrentOperatingSystem());
+                               return -1;
+
+                       case UNKNOWN:
+                               LOG.error("Cannot determine size of process 
virtual memory for unknown operating system");
+                               return -1;
+
+                       default:
+                               LOG.error("Unrecognized OS: " + 
OperatingSystem.getCurrentOperatingSystem());
+                               return -1;
+               }
+       }
+
+       /**
+        * Returns the size of the process physical memory in bytes.
+        *
+        * @return the size of the process physical memory in bytes or {@code 
-1}, if
+        *         the size could not be determined.
+        */
+       public static long getSizeOfProcessPhysicalMemory() {
+               // try the OS specific access paths
+               switch (OperatingSystem.getCurrentOperatingSystem()) {
+                       case LINUX:
+                               return getSizeOfProcessPhysicalMemoryForLinux();
+
+                       case WINDOWS:
+                       case MAC_OS:
+                       case FREE_BSD:
+                               LOG.warn("Unsupported access process physical 
memory for OS:" + OperatingSystem.getCurrentOperatingSystem());
+                               return -1;
+
+                       case UNKNOWN:
+                               LOG.error("Cannot determine size of process 
physical memory for unknown operating system");
+                               return -1;
+
+                       default:
+                               LOG.error("Unrecognized OS: " + 
OperatingSystem.getCurrentOperatingSystem());
+                               return -1;
+               }
+       }
+
+       /**
+        * Returns the size of the process virtual memory in bytes on a 
Linux-based
+        * operating system.
+        *
+        * @return the size of the process virtual memory in bytes or {@code 
-1}, if
+        *         the size could not be determined
+        */
+       private static long getSizeOfProcessVirtualMemoryForLinux() {
+               try {
+                       String virtualMemory = 
getProcessStatusForLinux(LINUX_PROCESS_VIRTUAL_MEMORY_REGEX);
+                       return Long.parseLong(virtualMemory) * 1024L; // 
Convert from kilobyte to byte
+               }
+               catch (NumberFormatException e) {
+                       LOG.error("Cannot determine the size of the process 
virtual memory(VmSize) for Linux host. " +
+                                       "Not a number format.");
+                       return -1;
+               }
+       }
+
+       /**
+        * Returns the size of the process physical memory in bytes on a 
Linux-based
+        * operating system.
+        *
+        * @return the size of the process physical memory in bytes or {@code 
-1}, if
+        *         the size could not be determined
+        */
+       private static long getSizeOfProcessPhysicalMemoryForLinux() {
+               try {
+                       String physicalMemory = 
getProcessStatusForLinux(LINUX_PROCESS_PHYSICAL_MEMORY_REGEX);
+                       return Long.parseLong(physicalMemory) * 1024L; // 
Convert from kilobyte to byte
+               }
+               catch (NumberFormatException e) {
+                       LOG.error("Cannot determine the size of the process 
physical memory(VmRSS) for Linux host. " +
+                                       "Not a number format.");
+                       return -1;
+               }
+       }
+
+       /**
+        * Returns the process status value which pattern matched on a 
Linux-based
+        * operating system.
+        *
+        * @param pattern The pattern to match the process status.
+        * @return the process status value which pattern matched or {@code 
""}, if
+        *         the pattern unmatched
+        */
+       private static String getProcessStatusForLinux(Pattern pattern) {
+               try (BufferedReader lineReader = new BufferedReader(new 
FileReader(LINUX_PROCESS_STATUS_PATH))) {
+                       String line;
+                       while ((line = lineReader.readLine()) != null) {
+                               Matcher matcher = pattern.matcher(line);
+                               if (matcher.matches()) {
+                                       return matcher.group(1);
+                               }
+                       }
+                       // expected line did not come
+                       LOG.error("Cannot determine process '" + pattern + "' 
for Linux host (using '" + LINUX_PROCESS_STATUS_PATH
+                                       + "'). " + "Unexpected format.");
+                       return "";
+               }
+               catch (Throwable t) {
+                       LOG.error("Cannot determine process '" + pattern + "' 
for Linux host (using '" + LINUX_PROCESS_STATUS_PATH
+                                       + "').", t);
+                       return "";
+               }
+       }
+
+       /**
+        * Returns the process cpu usage.
+        *
+        * @return the process cpu usage or {@code 0.0}, if
+        *         the cpu usage could not be determined.
+        */
+       public static double getProcessCpuUsage() {
+               // try the OS specific access paths
+               switch (OperatingSystem.getCurrentOperatingSystem()) {
+                       case LINUX:
+                               return 
cpuStatServiceForLinux.getProcessCpuUsage();
+
+                       case WINDOWS:
+                       case MAC_OS:
+                       case FREE_BSD:
+                               LOG.warn("Unsupported access process cpu usage 
for OS:" + OperatingSystem.getCurrentOperatingSystem());
+                               return 0.0;
+
+                       case UNKNOWN:
+                               LOG.error("Cannot determine process cpu usage 
for unknown operating system");
+                               return 0.0;
+
+                       default:
+                               LOG.error("Unrecognized OS: " + 
OperatingSystem.getCurrentOperatingSystem());
+                               return 0.0;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Service to extract CPU specifics on Linux host.
+        */
+       private static class CpuStatServiceForLinux {
+
+               private static final String LINUX_TOTAL_CPU_STAT_PATH = 
"/proc/stat";
+
+               private static final String LINUX_PROCESS_CPU_STAT_PATH = 
"/proc/self/stat";
+
+               /** The Number of CPU cores */
+               private static final int CPU_CORES_NUMBER = 
Runtime.getRuntime().availableProcessors();
+
+               /** Executor Service to collect CPU specifics periodically */
+               private final ScheduledExecutorService scheduleExecutor;
+
+               /** total cpu clock of last collect */
+               private double lastTotalCpuClock = 0.0;
+
+               /** self process cpu clock of last collect */
+               private double lastProcessCpuClock = 0.0;
+
+               /** self process cpu usage of last collect */
+               private double processCpuUsage = 0.0;
+
+               // 
--------------------------------------------------------------------------------------------
+
+               public CpuStatServiceForLinux() {
+                       // start Executor Service to collect CPU specifics 
periodically if on Linux host
+                       if (OperatingSystem.getCurrentOperatingSystem() == 
OperatingSystem.LINUX) {
+                               scheduleExecutor = 
Executors.newSingleThreadScheduledExecutor();
+                               scheduleExecutor.scheduleWithFixedDelay(new 
TimerTask() {
+                                       @Override
+                                       public void run() {
+                                               // collect cup usage 
periodically
+                                               collectLastCpuUsage();
+                                       }
+                               }, 10, 10, TimeUnit.SECONDS);
+                       } else {
+                               scheduleExecutor = null;
+                       }
+               }
+
+               /**
+                * shutdown Executor Service to collect CPU specifics
+                */
+               public void shutdown() {
+                       if (scheduleExecutor != null) {
+                               scheduleExecutor.shutdown();
+                       }
+               }
+
+               /**
+                * Returns the process cpu usage.
+                *
+                * @return the process cpu usage.
+                */
+               public double getProcessCpuUsage() {
+                       return processCpuUsage;
+               }
+
+               /**
+                * Collect last CPU usage on Linux host
+                */
+               private void collectLastCpuUsage() {
+                       try {
+                               double currentTotalCpuClock = 
getTotalCpuClock();
+                               double currentProcessCpuClock = 
getProcessCpuClock();
+
+                               if ((currentTotalCpuClock - lastTotalCpuClock) 
== 0.0) {
+                                       processCpuUsage = 0.0;
+                               } else {
+                                       processCpuUsage = 
((currentProcessCpuClock - lastProcessCpuClock) / (currentTotalCpuClock - 
lastTotalCpuClock))
+                                                       * CPU_CORES_NUMBER;
+                               }
+
+                               lastTotalCpuClock = currentTotalCpuClock;
+                               lastProcessCpuClock = currentProcessCpuClock;
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Collect last cpu usage for Linux 
host failure.", t);
+                       }
+               }
+
+               /**
+                * Returns the self process cpu clock on Linux host.
+                *
+                * @return the self process cpu clock.
+                */
+               private double getProcessCpuClock() throws IOException, 
NumberFormatException {
+                       String processCpuStat = 
getFirstLineFromFile(LINUX_PROCESS_CPU_STAT_PATH);
+                       if (processCpuStat == null) {
+                               throw new IOException("Cannot read file '" + 
LINUX_PROCESS_CPU_STAT_PATH + "' for Linux host.");
+                       }
+
+                       String[] cpuStats = processCpuStat.split(" ", -1);
+                       if (cpuStats.length < 17) {
+                               LOG.error("Cannot determine process cpu stat '" 
+ processCpuStat + "' for Linux host (using '" + LINUX_PROCESS_CPU_STAT_PATH
+                                               + "'). " + "Unexpected 
format.");
+                               throw new IOException("Parse process cpu stat 
'" + processCpuStat + "' for Linux host failed. Unexpected format.");
+                       }
+
+                       int rightBracketPos = -1;
+                       for (int i = cpuStats.length - 1; i > 0; i--) {
+                               if (cpuStats[i].contains(")")) {
+                                       rightBracketPos = i;
+                                       break;
+                               }
+                       }
+                       if (rightBracketPos == -1) {
+                               throw new IOException("Process cpu stat '" + 
processCpuStat + "' has no right bracket for Linux host. Unexpected format");
+                       }
+
+                       double processCpuClock = 0.0;
+                       for (int i = rightBracketPos + 12; i < rightBracketPos 
+ 16; i++) {
+                               processCpuClock += 
Double.parseDouble(cpuStats[i]);
+                       }
+
+                       return processCpuClock;
+               }
+
+               /**
+                * Returns the total process cpu clock on Linux host.
+                *
+                * @return the total process cpu clock.
+                */
+               private double getTotalCpuClock() throws IOException, 
NumberFormatException {
+                       String totalCpuStat = 
getFirstLineFromFile(LINUX_TOTAL_CPU_STAT_PATH);
+                       if (totalCpuStat == null) {
+                               throw new IOException("Cannot read file '" + 
LINUX_TOTAL_CPU_STAT_PATH + "' for Linux host.");
+                       }
+
+                       String[] cpuStats = totalCpuStat.split(" ", -1);
+                       if (cpuStats.length < 11) {
+                               LOG.error("Cannot determine total cpu stat '" + 
totalCpuStat + "' for Linux host (using '" + LINUX_TOTAL_CPU_STAT_PATH
+                                               + "'). " + "Unexpected 
format.");
+                               throw new IOException("Parse total cpu stat '" 
+ totalCpuStat + "' for Linux host failed. Unexpected format.");
+                       }
+
+                       double totalCpuClock = 0.0;
+                       for (int i = 2; i < cpuStats.length; i++) {
+                               totalCpuClock += 
Double.parseDouble(cpuStats[i]);
+                       }
+
+                       return totalCpuClock;
+               }
+
+               /**
+                * Returns the first line of cpu stat file on a Linux-based
+                * operating system.
+                *
+                * @param fileName The cpu stat file.
+                * @return the first line of cpu stat file or {@code null}, if
+                *         the file cannot read
+                */
+               private String getFirstLineFromFile(String fileName) {
+                       try (BufferedReader lineReader = new BufferedReader(new 
FileReader(fileName))) {
+                               String line = lineReader.readLine();
+                               return line;
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Cannot read file '" + fileName + "' 
for Linux host.", t);
+                               return null;
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private Hardware() {}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 41d307732c9..1a1b78c9ba4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -268,6 +268,12 @@ class TaskManager(
       case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
     }
 
+    try {
+      Hardware.shutdown()
+    } catch {
+      case t: Exception => log.error("Hardware did not shutdown properly.", t)
+    }
+
     log.info(s"Task manager ${self.path} is completely shut down.")
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TaskManager process virtual memory and physical memory used size gauge
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5103
>                 URL: https://issues.apache.org/jira/browse/FLINK-5103
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>            Reporter: zhuhaifeng
>            Assignee: zhuhaifeng
>            Priority: Minor
>              Labels: pull-request-available
>
> Add TaskManger Process virtual memory and physical memory used size gauge 
> metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to