[ https://issues.apache.org/jira/browse/STORM-1336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138713#comment-15138713 ]
ASF GitHub Bot commented on STORM-1336: --------------------------------------- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1053#discussion_r52288285 --- Diff: storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java --- @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.container.cgroup; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.container.cgroup.core.CpuCore; +import org.apache.storm.container.cgroup.core.MemoryCore; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class that implements ResourceIsolationInterface that manages cgroups + */ +public class CgroupManager implements ResourceIsolationInterface { + + private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class); + + private CgroupCenter center; + + private Hierarchy hierarchy; + + private CgroupCommon rootCgroup; + + private static String rootDir; + + private Map conf; + + /** + * initialize intial data structures + * @param conf storm confs + */ + public void prepare(Map conf) throws IOException { + this.conf = conf; + this.rootDir = Config.getCgroupRootDir(this.conf); + if (this.rootDir == null) { + throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing."); + } + + File file = new File(Config.getCgroupStormHierarchyDir(conf) + "/" + this.rootDir); + if (!file.exists()) { + LOG.error("{}/{} is not existing.", Config.getCgroupStormHierarchyDir(conf), this.rootDir); + throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file."); + } + this.center = CgroupCenter.getInstance(); + if (this.center == null) { + throw new RuntimeException("Cgroup error, please check /proc/cgroups"); + } + this.prepareSubSystem(this.conf); + } + + /** + * initalize subsystems + */ + private void prepareSubSystem(Map conf) throws IOException { + List<SubSystemType> subSystemTypes = new LinkedList<>(); + for (String resource : Config.getCgroupStormResources(conf)) { + subSystemTypes.add(SubSystemType.getSubSystem(resource)); + } + + this.hierarchy = center.getHierarchyWithSubSystems(subSystemTypes); + + if (this.hierarchy == null) { + Set<SubSystemType> types = new HashSet<SubSystemType>(); + types.add(SubSystemType.cpu); + this.hierarchy = new Hierarchy(Config.getCgroupStormHierarchyName(conf), types, Config.getCgroupStormHierarchyDir(conf)); + } + this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups()); + + // set upper limit to how much cpu can be used by all workers running on supervisor node. + // This is done so that some cpu cycles will remain free to run the daemons and other miscellaneous OS operations. + CpuCore supervisorRootCPU = (CpuCore) this.rootCgroup.getCores().get(SubSystemType.cpu); + setCpuUsageUpperLimit(supervisorRootCPU, ((Number) this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue()); + } + + /** + * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. + * If making a process to fully use two cpu cores, set cfs_period_us to + * 100000 and set cfs_quota_us to 200000 + */ + private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException { + + if (cpuCoreUpperLimit == -1) { + // No control of cpu usage + cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit); + } else { + cpuCore.setCpuCfsPeriodUs(100000); + cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit * 1000); + } + } + + public void reserveResourcesForWorker(String workerId, Map resourcesMap) throws SecurityException { + Number cpuNum = null; + // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) { + cpuNum = (Number) this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT); + } else if(resourcesMap.get("cpu") != null) { + cpuNum = (Number) resourcesMap.get("cpu"); + } + + Number totalMem = null; + // The manually set STORM_WORKER_CGROUP_MEMORY_MB_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + if (this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT) != null) { + totalMem = (Number) this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT); + } else if (resourcesMap.get("memory") != null) { + totalMem = (Number) resourcesMap.get("memory"); + } + + CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup); + this.center.createCgroup(workerGroup); + + if (cpuNum != null) { + CpuCore cpuCore = (CpuCore) workerGroup.getCores().get(SubSystemType.cpu); + try { + cpuCore.setCpuShares(cpuNum.intValue()); + } catch (IOException e) { + throw new RuntimeException("Cannot set cpu.shares! Exception: " + e); + } + } + + if (totalMem != null) { + MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory); + try { + memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024)); + } catch (IOException e) { + throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: " + e); --- End diff -- use, e instead of + e > Evalute/Port JStorm cgroup support > ---------------------------------- > > Key: STORM-1336 > URL: https://issues.apache.org/jira/browse/STORM-1336 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Boyang Jerry Peng > Labels: jstorm-merger > > Supports controlling the upper limit of CPU core usage for a worker using > cgroups > Sounds like a good start, will be nice to integrate it with RAS requests too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)