This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c4d059a Include memory usage of child processes in physical-memory-mb
metric (#1530)
c4d059a is described below
commit c4d059a4db6cae86af046b8a28c33c0316a59e69
Author: Ziting <[email protected]>
AuthorDate: Tue Sep 14 17:09:54 2021 -0700
Include memory usage of child processes in physical-memory-mb metric (#1530)
---
.../host/PosixCommandBasedStatisticsGetter.java | 43 ++++++++++++++++++++--
1 file changed, 40 insertions(+), 3 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
index 1178261..8fdff3b 100644
---
a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
+++
b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -18,6 +18,9 @@
*/
package org.apache.samza.container.host;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,10 +58,44 @@ public class PosixCommandBasedStatisticsGetter implements
SystemStatisticsGetter
return psOutput;
}
- private long getPhysicalMemory() throws IOException {
+ /**
+ * A convenience method to execute shell commands and return all lines of
their output.
+ *
+ * @param cmdArray the command to run
+ * @return all lines of the output.
+ * @throws IOException
+ */
+ private List<String> getAllCommandOutput(String[] cmdArray) throws
IOException {
+ Process executable = Runtime.getRuntime().exec(cmdArray);
+ BufferedReader processReader = null;
+ List<String> psOutput;
+
+ try {
+ processReader = new BufferedReader(new
InputStreamReader(executable.getInputStream()));
+ psOutput =
processReader.lines().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+ } finally {
+ if (processReader != null) {
+ processReader.close();
+ }
+ }
+ return psOutput;
+ }
+
+ private long getTotalPhysicalMemory() throws IOException {
+ // collect all child process ids of the main process that runs the
application
+ List<String> processIds = getAllCommandOutput(new String[]{"sh", "-c",
"pgrep -P $PPID"});
+ // add the parent process which is the main process that runs the
application
+ processIds.add("$PPID");
+ long totalPhysicalMemory = 0;
+ for (String processId : processIds) {
+ totalPhysicalMemory += getPhysicalMemory(processId);
+ }
+ return totalPhysicalMemory;
+ }
+ private long getPhysicalMemory(String processId) throws IOException {
// returns a single long value that represents the rss memory of the
process.
- String commandOutput = getCommandOutput(new String[]{"sh", "-c", "ps -o
rss= -p $PPID"});
+ String commandOutput = getCommandOutput(new String[]{"sh", "-c",
String.format("ps -o rss= -p %s", processId)});
// this should never happen.
if (commandOutput == null) {
@@ -74,7 +111,7 @@ public class PosixCommandBasedStatisticsGetter implements
SystemStatisticsGetter
@Override
public SystemMemoryStatistics getSystemMemoryStatistics() {
try {
- long memory = getPhysicalMemory();
+ long memory = getTotalPhysicalMemory();
return new SystemMemoryStatistics(memory);
} catch (Exception e) {
log.warn("Error when running ps: ", e);