YARN-5776. Checkstyle: MonitoringThread.Run method length is too long ([email protected] via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9449519a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9449519a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9449519a Branch: refs/heads/HDFS-9806 Commit: 9449519a2503c55d9eac8fd7519df28aa0760059 Parents: dd4ed6a Author: Robert Kanter <[email protected]> Authored: Thu Oct 27 14:36:27 2016 -0700 Committer: Robert Kanter <[email protected]> Committed: Thu Oct 27 14:36:38 2016 -0700 ---------------------------------------------------------------------- .../monitor/ContainersMonitorImpl.java | 460 +++++++++++-------- 1 file changed, 279 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9449519a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index a04a914..cd9d6af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -48,10 +48,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +/** + * Monitors containers collecting resource usage and preempting the container + * if it exceeds its limits. + */ public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { - final static Log LOG = LogFactory + private final static Log LOG = LogFactory .getLog(ContainersMonitorImpl.class); private long monitoringInterval; @@ -66,7 +70,7 @@ public class ContainersMonitorImpl extends AbstractService implements private final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; - protected final Context context; + private final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; private Configuration conf; private static float vmemRatio; @@ -84,15 +88,18 @@ public class ContainersMonitorImpl extends AbstractService implements private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; + /** + * Type of container metric. + */ @Private - public static enum ContainerMetric { + public enum ContainerMetric { CPU, MEMORY } private ResourceUtilization containersUtilization; // Tracks the aggregated allocation of the currently allocated containers // when queuing of containers at the NMs is enabled. - private ResourceUtilization containersAllocation; + private final ResourceUtilization containersAllocation; private volatile boolean stopped = false; @@ -111,44 +118,47 @@ public class ContainersMonitorImpl extends AbstractService implements } @Override - protected void serviceInit(Configuration conf) throws Exception { + protected void serviceInit(Configuration myConf) throws Exception { + this.conf = myConf; this.monitoringInterval = - conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, - conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS, + this.conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, + this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS)); Class<? extends ResourceCalculatorPlugin> clazz = - conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, - conf.getClass( + this.conf.getClass(YarnConfiguration + .NM_CONTAINER_MON_RESOURCE_CALCULATOR, + this.conf.getClass( YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null, ResourceCalculatorPlugin.class), ResourceCalculatorPlugin.class); this.resourceCalculatorPlugin = - ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); + ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, this.conf); LOG.info(" Using ResourceCalculatorPlugin : " + this.resourceCalculatorPlugin); - processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null, + processTreeClass = this.conf.getClass( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null, ResourceCalculatorProcessTree.class); - this.conf = conf; LOG.info(" Using ResourceCalculatorProcessTree : " + this.processTreeClass); this.containerMetricsEnabled = - conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE, + this.conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE); this.containerMetricsPeriodMs = - conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, + this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); - this.containerMetricsUnregisterDelayMs = conf.getLong( + this.containerMetricsUnregisterDelayMs = this.conf.getLong( YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); long configuredPMemForContainers = NodeManagerHardwareUtils.getContainerMemoryMB( - this.resourceCalculatorPlugin, conf) * 1024 * 1024L; + this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L; long configuredVCoresForContainers = - NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf); + NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, + this.conf); // Setting these irrespective of whether checks are enabled. Required in // the UI. @@ -157,16 +167,18 @@ public class ContainersMonitorImpl extends AbstractService implements this.maxVCoresAllottedForContainers = configuredVCoresForContainers; // ///////// Virtual memory configuration ////// - vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); Preconditions.checkArgument(vmemRatio > 0.99f, YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0"); this.maxVmemAllottedForContainers = (long) (vmemRatio * configuredPMemForContainers); - pmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, + pmemCheckEnabled = this.conf.getBoolean( + YarnConfiguration.NM_PMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED); - vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, + vmemCheckEnabled = this.conf.getBoolean( + YarnConfiguration.NM_VMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); @@ -175,7 +187,7 @@ public class ContainersMonitorImpl extends AbstractService implements LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled); nodeCpuPercentageForYARN = - NodeManagerHardwareUtils.getNodeCpuPercentage(conf); + NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf); if (pmemCheckEnabled) { // Logging if actual pmem cannot be determined. @@ -201,7 +213,7 @@ public class ContainersMonitorImpl extends AbstractService implements 1) + "). Thrashing might happen."); } } - super.serviceInit(conf); + super.serviceInit(this.conf); } private boolean isContainerMonitorEnabled() { @@ -241,12 +253,15 @@ public class ContainersMonitorImpl extends AbstractService implements try { this.monitoringThread.join(); } catch (InterruptedException e) { - ; + LOG.info("ContainersMonitorImpl monitoring thread interrupted"); } } super.serviceStop(); } + /** + * Encapsulates resource requirements of a process and its tree. + */ public static class ProcessTreeInfo { private ContainerId containerId; private String pid; @@ -278,49 +293,49 @@ public class ContainersMonitorImpl extends AbstractService implements this.pid = pid; } - public ResourceCalculatorProcessTree getProcessTree() { + ResourceCalculatorProcessTree getProcessTree() { return this.pTree; } - public void setProcessTree(ResourceCalculatorProcessTree pTree) { - this.pTree = pTree; + void setProcessTree(ResourceCalculatorProcessTree mypTree) { + this.pTree = mypTree; } /** * @return Virtual memory limit for the process tree in bytes */ - public synchronized long getVmemLimit() { + synchronized long getVmemLimit() { return this.vmemLimit; } /** * @return Physical memory limit for the process tree in bytes */ - public synchronized long getPmemLimit() { + synchronized long getPmemLimit() { return this.pmemLimit; } /** * @return Number of cpu vcores assigned */ - public synchronized int getCpuVcores() { + synchronized int getCpuVcores() { return this.cpuVcores; } /** - * Set resource limit for enforcement - * @param pmemLimit + * Set resource limit for enforcement. + * @param myPmemLimit * Physical memory limit for the process tree in bytes - * @param vmemLimit + * @param myVmemLimit * Virtual memory limit for the process tree in bytes - * @param cpuVcores + * @param myCpuVcores * Number of cpu vcores assigned */ - public synchronized void setResourceLimit( - long pmemLimit, long vmemLimit, int cpuVcores) { - this.pmemLimit = pmemLimit; - this.vmemLimit = vmemLimit; - this.cpuVcores = cpuVcores; + synchronized void setResourceLimit( + long myPmemLimit, long myVmemLimit, int myCpuVcores) { + this.pmemLimit = myPmemLimit; + this.vmemLimit = myVmemLimit; + this.cpuVcores = myCpuVcores; } } @@ -354,7 +369,7 @@ public class ContainersMonitorImpl extends AbstractService implements * or if processes in the tree, older than this thread's monitoring * interval, exceed the memory limit. False, otherwise. */ - boolean isProcessTreeOverLimit(String containerId, + private boolean isProcessTreeOverLimit(String containerId, long currentMemUsage, long curMemUsageOfAgedProcesses, long vmemLimit) { @@ -388,7 +403,7 @@ public class ContainersMonitorImpl extends AbstractService implements } private class MonitoringThread extends Thread { - public MonitoringThread() { + MonitoringThread() { super("Container Monitor"); } @@ -425,43 +440,8 @@ public class ContainersMonitorImpl extends AbstractService implements try { String pId = ptInfo.getPID(); - // Initialize any uninitialized processTrees - if (pId == null) { - // get pid from ContainerId - pId = containerExecutor.getProcessId(ptInfo.getContainerId()); - if (pId != null) { - // pId will be null, either if the container is not spawned yet - // or if the container's pid is removed from ContainerExecutor - LOG.debug("Tracking ProcessTree " + pId - + " for the first time"); - - ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree. - getResourceCalculatorProcessTree( - pId, processTreeClass, conf); - ptInfo.setPid(pId); - ptInfo.setProcessTree(pt); - - if (containerMetricsEnabled) { - ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs, - containerMetricsUnregisterDelayMs); - usageMetrics.recordProcessId(pId); - } - Container container = context.getContainers().get(containerId); - String[] ipAndHost = containerExecutor.getIpAndHost(container); - if (ipAndHost != null && ipAndHost[0] != null - && ipAndHost[1] != null) { - container.setIpAndHost(ipAndHost); - LOG.info(containerId + "'s ip = " + ipAndHost[0] - + ", and hostname = " + ipAndHost[1]); - } else { - LOG.info("Can not get both ip and hostname: " + Arrays - .toString(ipAndHost)); - } - } - } - // End of initializing any uninitialized processTrees + // Initialize uninitialized process trees + initializeProcessTrees(entry); if (pId == null || !isResourceCalculatorAvailable()) { continue; // processTree cannot be tracked @@ -487,74 +467,11 @@ public class ContainersMonitorImpl extends AbstractService implements continue; } - float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore / - resourceCalculatorPlugin.getNumProcessors(); - - // Multiply by 1000 to avoid losing data when converting to int - int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000 - * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); - // as processes begin with an age 1, we want to see if there - // are processes more than 1 iteration old. - long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1); - long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1); - long vmemLimit = ptInfo.getVmemLimit(); - long pmemLimit = ptInfo.getPmemLimit(); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Memory usage of ProcessTree %s for container-id %s: ", - pId, containerId.toString()) + - formatUsageString( - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit)); - } - - // Add resource utilization for this container - trackedContainersUtilization.addTo( - (int) (currentPmemUsage >> 20), - (int) (currentVmemUsage >> 20), - milliVcoresUsed / 1000.0f); - - // Add usage to container metrics - if (containerMetricsEnabled) { - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs, - containerMetricsUnregisterDelayMs).recordMemoryUsage( - (int) (currentPmemUsage >> 20)); - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs, - containerMetricsUnregisterDelayMs).recordCpuUsage - ((int)cpuUsagePercentPerCore, milliVcoresUsed); - } + recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage, + currentPmemUsage, trackedContainersUtilization); - boolean isMemoryOverLimit = false; - String msg = ""; - int containerExitStatus = ContainerExitStatus.INVALID; - if (isVmemCheckEnabled() - && isProcessTreeOverLimit(containerId.toString(), - currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { - // Container (the root process) is still alive and overflowing - // memory. - // Dump the process-tree and then clean it up. - msg = formatErrorMessage("virtual", - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit, - pId, containerId, pTree); - isMemoryOverLimit = true; - containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM; - } else if (isPmemCheckEnabled() - && isProcessTreeOverLimit(containerId.toString(), - currentPmemUsage, curRssMemUsageOfAgedProcesses, - pmemLimit)) { - // Container (the root process) is still alive and overflowing - // memory. - // Dump the process-tree and then clean it up. - msg = formatErrorMessage("physical", - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit, - pId, containerId, pTree); - isMemoryOverLimit = true; - containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM; - } + checkLimit(containerId, pId, pTree, ptInfo, + currentVmemUsage, currentPmemUsage); // Accounting the total memory in usage for all containers vmemUsageByAllContainers += currentVmemUsage; @@ -563,32 +480,8 @@ public class ContainersMonitorImpl extends AbstractService implements cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore; cpuUsageTotalCoresByAllContainers += cpuUsagePercentPerCore; - if (isMemoryOverLimit) { - // Virtual or physical memory over limit. Fail the container and - // remove - // the corresponding process tree - LOG.warn(msg); - // warn if not a leader - if (!pTree.checkPidPgrpidForMatch()) { - LOG.error("Killed container process with PID " + pId - + " but it is not a process group leader."); - } - // kill the container - eventDispatcher.getEventHandler().handle( - new ContainerKillEvent(containerId, - containerExitStatus, msg)); - trackingContainers.remove(containerId); - LOG.info("Removed ProcessTree with root " + pId); - } - - ContainerImpl container = - (ContainerImpl) context.getContainers().get(containerId); - NMTimelinePublisher nmMetricsPublisher = - container.getNMTimelinePublisher(); - if (nmMetricsPublisher != null) { - nmMetricsPublisher.reportContainerResourceUsage(container, - currentPmemUsage, cpuUsagePercentPerCore); - } + reportResourceUsage(containerId, currentPmemUsage, + cpuUsagePercentPerCore); } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " @@ -617,21 +510,226 @@ public class ContainersMonitorImpl extends AbstractService implements } } + /** + * Initialize any uninitialized processTrees. + * @param entry process tree entry to fill in + */ + private void initializeProcessTrees( + Entry<ContainerId, ProcessTreeInfo> entry) { + ContainerId containerId = entry.getKey(); + ProcessTreeInfo ptInfo = entry.getValue(); + String pId = ptInfo.getPID(); + + // Initialize any uninitialized processTrees + if (pId == null) { + // get pid from ContainerId + pId = containerExecutor.getProcessId(ptInfo.getContainerId()); + if (pId != null) { + // pId will be null, either if the container is not spawned yet + // or if the container's pid is removed from ContainerExecutor + LOG.debug("Tracking ProcessTree " + pId + + " for the first time"); + + ResourceCalculatorProcessTree pt = + ResourceCalculatorProcessTree. + getResourceCalculatorProcessTree( + pId, processTreeClass, conf); + ptInfo.setPid(pId); + ptInfo.setProcessTree(pt); + + if (containerMetricsEnabled) { + ContainerMetrics usageMetrics = ContainerMetrics + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); + usageMetrics.recordProcessId(pId); + } + + Container container = context.getContainers().get(containerId); + String[] ipAndHost = containerExecutor.getIpAndHost(container); + if (ipAndHost != null && ipAndHost[0] != null + && ipAndHost[1] != null) { + container.setIpAndHost(ipAndHost); + LOG.info(containerId + "'s ip = " + ipAndHost[0] + + ", and hostname = " + ipAndHost[1]); + } else { + LOG.info("Can not get both ip and hostname: " + Arrays + .toString(ipAndHost)); + } + } + } + // End of initializing any uninitialized processTrees + } + + /** + * Record usage metrics. + * @param containerId container id + * @param pId process id + * @param pTree valid process tree entry with CPU measurement + * @param ptInfo process tree info with limit information + * @param currentVmemUsage virtual memory measurement + * @param currentPmemUsage physical memory measurement + * @param trackedContainersUtilization utilization tracker to update + */ + private void recordUsage(ContainerId containerId, String pId, + ResourceCalculatorProcessTree pTree, + ProcessTreeInfo ptInfo, + long currentVmemUsage, long currentPmemUsage, + ResourceUtilization trackedContainersUtilization) { + float cpuUsagePercentPerCore = pTree.getCpuUsagePercent(); + float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore / + resourceCalculatorPlugin.getNumProcessors(); + + // Multiply by 1000 to avoid losing data when converting to int + int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000 + * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); + long vmemLimit = ptInfo.getVmemLimit(); + long pmemLimit = ptInfo.getPmemLimit(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Memory usage of ProcessTree %s for container-id %s: ", + pId, containerId.toString()) + + formatUsageString( + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit)); + } + + // Add resource utilization for this container + trackedContainersUtilization.addTo( + (int) (currentPmemUsage >> 20), + (int) (currentVmemUsage >> 20), + milliVcoresUsed / 1000.0f); + + // Add usage to container metrics + if (containerMetricsEnabled) { + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordMemoryUsage( + (int) (currentPmemUsage >> 20)); + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordCpuUsage((int) + cpuUsagePercentPerCore, milliVcoresUsed); + } + } + + /** + * Check resource limits and take actions if the limit is exceeded. + * @param containerId container id + * @param pId process id + * @param pTree valid process tree entry with CPU measurement + * @param ptInfo process tree info with limit information + * @param currentVmemUsage virtual memory measurement + * @param currentPmemUsage physical memory measurement + */ + @SuppressWarnings("unchecked") + private void checkLimit(ContainerId containerId, String pId, + ResourceCalculatorProcessTree pTree, + ProcessTreeInfo ptInfo, + long currentVmemUsage, + long currentPmemUsage) { + boolean isMemoryOverLimit = false; + long vmemLimit = ptInfo.getVmemLimit(); + long pmemLimit = ptInfo.getPmemLimit(); + // as processes begin with an age 1, we want to see if there + // are processes more than 1 iteration old. + long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1); + long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1); + String msg = ""; + int containerExitStatus = ContainerExitStatus.INVALID; + if (isVmemCheckEnabled() + && isProcessTreeOverLimit(containerId.toString(), + currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = formatErrorMessage("virtual", + formatUsageString(currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit), + pId, containerId, pTree); + isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM; + } else if (isPmemCheckEnabled() + && isProcessTreeOverLimit(containerId.toString(), + currentPmemUsage, curRssMemUsageOfAgedProcesses, + pmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = formatErrorMessage("physical", + formatUsageString(currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit), + pId, containerId, pTree); + isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM; + } + + if (isMemoryOverLimit) { + // Virtual or physical memory over limit. Fail the container and + // remove + // the corresponding process tree + LOG.warn(msg); + // warn if not a leader + if (!pTree.checkPidPgrpidForMatch()) { + LOG.error("Killed container process with PID " + pId + + " but it is not a process group leader."); + } + // kill the container + eventDispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, + containerExitStatus, msg)); + trackingContainers.remove(containerId); + LOG.info("Removed ProcessTree with root " + pId); + } + } + + /** + * Report usage metrics to the timeline service. + * @param containerId container id + * @param currentPmemUsage physical memory measurement + * @param cpuUsagePercentPerCore CPU usage + */ + private void reportResourceUsage(ContainerId containerId, + long currentPmemUsage, float cpuUsagePercentPerCore) { + ContainerImpl container = + (ContainerImpl) context.getContainers().get(containerId); + NMTimelinePublisher nmMetricsPublisher = + container.getNMTimelinePublisher(); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.reportContainerResourceUsage(container, + currentPmemUsage, cpuUsagePercentPerCore); + } + } + + /** + * Format string when memory limit has been exceeded. + * @param memTypeExceeded type of memory + * @param usageString general memory usage information string + * @param pId process id + * @param containerId container id + * @param pTree process tree to dump full resource utilization graph + * @return formatted resource usage information + */ private String formatErrorMessage(String memTypeExceeded, - long currentVmemUsage, long vmemLimit, - long currentPmemUsage, long pmemLimit, - String pId, ContainerId containerId, ResourceCalculatorProcessTree pTree) { + String usageString, String pId, ContainerId containerId, + ResourceCalculatorProcessTree pTree) { return - String.format("Container [pid=%s,containerID=%s] is running beyond %s memory limits. ", + String.format("Container [pid=%s,containerID=%s] is " + + "running beyond %s memory limits. ", pId, containerId, memTypeExceeded) + - "Current usage: " + - formatUsageString(currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit) + + "Current usage: " + usageString + ". Killing container.\n" + "Dump of the process-tree for " + containerId + " :\n" + pTree.getProcessTreeDump(); } + /** + * Format memory usage string for reporting. + * @param currentVmemUsage virtual memory usage + * @param vmemLimit virtual memory limit + * @param currentPmemUsage physical memory usage + * @param pmemLimit physical memory limit + * @return formatted memory information + */ private String formatUsageString(long currentVmemUsage, long vmemLimit, long currentPmemUsage, long pmemLimit) { return String.format("%sB of %sB physical memory used; " + @@ -746,7 +844,7 @@ public class ContainersMonitorImpl extends AbstractService implements return this.containersUtilization; } - public void setContainersUtilization(ResourceUtilization utilization) { + private void setContainersUtilization(ResourceUtilization utilization) { this.containersUtilization = utilization; } @@ -858,7 +956,7 @@ public class ContainersMonitorImpl extends AbstractService implements } } - protected void onChangeMonitoringContainerResource( + private void onChangeMonitoringContainerResource( ContainersMonitorEvent monitoringEvent, ContainerId containerId) { ChangeMonitoringContainerResourceEvent changeEvent = (ChangeMonitoringContainerResourceEvent) monitoringEvent; @@ -878,14 +976,14 @@ public class ContainersMonitorImpl extends AbstractService implements changeContainerResource(containerId, changeEvent.getResource()); } - protected void onStopMonitoringContainer( + private void onStopMonitoringContainer( ContainersMonitorEvent monitoringEvent, ContainerId containerId) { LOG.info("Stopping resource-monitoring for " + containerId); updateContainerMetrics(monitoringEvent); trackingContainers.remove(containerId); } - protected void onStartMonitoringContainer( + private void onStartMonitoringContainer( ContainersMonitorEvent monitoringEvent, ContainerId containerId) { ContainerStartMonitoringEvent startEvent = (ContainerStartMonitoringEvent) monitoringEvent; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
