This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d059a  Include memory usage of child processes in physical-memory-mb 
metric (#1530)
c4d059a is described below

commit c4d059a4db6cae86af046b8a28c33c0316a59e69
Author: Ziting <[email protected]>
AuthorDate: Tue Sep 14 17:09:54 2021 -0700

    Include memory usage of child processes in physical-memory-mb metric (#1530)
---
 .../host/PosixCommandBasedStatisticsGetter.java    | 43 ++++++++++++++++++++--
 1 file changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
 
b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
index 1178261..8fdff3b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -18,6 +18,9 @@
  */
 package org.apache.samza.container.host;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,10 +58,44 @@ public class PosixCommandBasedStatisticsGetter implements 
SystemStatisticsGetter
     return psOutput;
   }
 
-  private long getPhysicalMemory() throws IOException {
+  /**
+   * A convenience method to execute shell commands and return all lines of 
their output.
+   *
+   * @param cmdArray the command to run
+   * @return all lines of the output.
+   * @throws IOException
+   */
+  private List<String> getAllCommandOutput(String[] cmdArray) throws 
IOException {
+    Process executable = Runtime.getRuntime().exec(cmdArray);
+    BufferedReader processReader = null;
+    List<String> psOutput;
+
+    try {
+      processReader = new BufferedReader(new 
InputStreamReader(executable.getInputStream()));
+      psOutput = 
processReader.lines().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    } finally {
+      if (processReader != null) {
+        processReader.close();
+      }
+    }
+    return psOutput;
+  }
+
+  private long getTotalPhysicalMemory() throws IOException {
+    // collect all child process ids of the main process that runs the 
application
+    List<String> processIds = getAllCommandOutput(new String[]{"sh", "-c", 
"pgrep -P $PPID"});
+    // add the parent process which is the main process that runs the 
application
+    processIds.add("$PPID");
+    long totalPhysicalMemory = 0;
+    for (String processId : processIds) {
+      totalPhysicalMemory += getPhysicalMemory(processId);
+    }
+    return totalPhysicalMemory;
+  }
 
+  private long getPhysicalMemory(String processId) throws IOException {
     // returns a single long value that represents the rss memory of the 
process.
-    String commandOutput = getCommandOutput(new String[]{"sh", "-c", "ps -o 
rss= -p $PPID"});
+    String commandOutput = getCommandOutput(new String[]{"sh", "-c", 
String.format("ps -o rss= -p %s", processId)});
 
     // this should never happen.
     if (commandOutput == null) {
@@ -74,7 +111,7 @@ public class PosixCommandBasedStatisticsGetter implements 
SystemStatisticsGetter
   @Override
   public SystemMemoryStatistics getSystemMemoryStatistics() {
     try {
-      long memory = getPhysicalMemory();
+      long memory = getTotalPhysicalMemory();
       return new SystemMemoryStatistics(memory);
     } catch (Exception e) {
       log.warn("Error when running ps: ", e);

Reply via email to