another round of changes edits based on comments for zhuoliu and abhishekagarwal87
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9421cd8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9421cd8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9421cd8 Branch: refs/heads/master Commit: c9421cd8b712aae7d0aa3dda986de8920c08fe54 Parents: fc063ec Author: Boyang Jerry Peng <[email protected]> Authored: Fri Feb 12 10:44:27 2016 -0600 Committer: Boyang Jerry Peng <[email protected]> Committed: Fri Feb 12 10:44:27 2016 -0600 ---------------------------------------------------------------------- conf/cgconfig.conf.example | 2 +- conf/defaults.yaml | 10 +- .../starter/ResourceAwareExampleTopology.java | 2 +- .../clj/org/apache/storm/daemon/supervisor.clj | 56 +++----- storm-core/src/jvm/org/apache/storm/Config.java | 9 +- .../container/ResourceIsolationInterface.java | 18 ++- .../storm/container/cgroup/CgroupCenter.java | 116 +++++++--------- .../storm/container/cgroup/CgroupCommon.java | 106 +++++++++----- .../container/cgroup/CgroupCommonOperation.java | 1 - .../container/cgroup/CgroupCoreFactory.java | 1 - .../storm/container/cgroup/CgroupManager.java | 139 ++++++++++++------- .../storm/container/cgroup/CgroupOperation.java | 46 +++++- .../storm/container/cgroup/CgroupUtils.java | 74 ++++------ .../storm/container/cgroup/Constants.java | 30 ---- .../apache/storm/container/cgroup/Device.java | 3 + .../storm/container/cgroup/Hierarchy.java | 17 ++- .../storm/container/cgroup/SubSystem.java | 7 +- .../storm/container/cgroup/SubSystemType.java | 40 ++---- .../storm/container/cgroup/SystemOperation.java | 24 +++- .../storm/container/cgroup/core/BlkioCore.java | 122 +++++----------- .../storm/container/cgroup/core/CpuCore.java | 23 ++- .../container/cgroup/core/CpuacctCore.java | 9 +- .../storm/container/cgroup/core/CpusetCore.java | 57 ++++---- .../container/cgroup/core/DevicesCore.java | 37 ++--- .../container/cgroup/core/FreezerCore.java | 5 +- .../storm/container/cgroup/core/MemoryCore.java | 37 +++-- .../storm/container/cgroup/core/NetClsCore.java | 5 +- .../container/cgroup/core/NetPrioCore.java | 7 +- .../src/jvm/org/apache/storm/utils/Utils.java | 7 +- .../clj/org/apache/storm/supervisor_test.clj | 2 +- .../test/jvm/org/apache/storm/TestCgroups.java | 24 +++- .../resource/TestResourceAwareScheduler.java | 3 + 32 files changed, 530 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/conf/cgconfig.conf.example ---------------------------------------------------------------------- diff --git a/conf/cgconfig.conf.example b/conf/cgconfig.conf.example index 555b83a..70ac495 100644 --- a/conf/cgconfig.conf.example +++ b/conf/cgconfig.conf.example @@ -38,4 +38,4 @@ group storm { } cpu { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index e32e6f7..b88d478 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -156,7 +156,7 @@ supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true supervisor.supervisors: [] supervisor.supervisors.commands: [] -supervisor.memory.capacity.mb: 3072.0 +supervisor.memory.capacity.mb: 4096.0 #By convention 1 cpu core should be about 100, but this can be adjusted if needed # using 100 makes it simple to set the desired value to the capacity measurement # for single threaded bolts @@ -263,7 +263,7 @@ topology.state.checkpoint.interval.ms: 1000 # topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases). # Recommended range of 0-29 but no hard limit set. topology.priority: 29 -topology.component.resources.onheap.memory.mb: 128.0 +topology.component.resources.onheap.memory.mb: 256.0 topology.component.resources.offheap.memory.mb: 0.0 topology.component.cpu.pcore.percent: 10.0 topology.worker.max.heap.size.mb: 768.0 @@ -287,14 +287,14 @@ storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager" +storm.resource.isolation.plugin.enable: false # Configs for CGroup support storm.cgroup.hierarchy.dir: "/cgroup/storm_resources" storm.cgroup.resources: - - cpu - - memory + - "cpu" + - "memory" storm.cgroup.hierarchy.name: "storm" # Also determines whether the unit tests for cgroup runs. If cgroup.enable is set to false the unit tests for cgroups will not run -storm.cgroup.enable: false storm.supervisor.cgroup.rootdir: "storm" storm.cgroup.cgexec.cmd: "/bin/cgexec" http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java index d4aa304..19efbc5 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java @@ -59,7 +59,7 @@ public class ResourceAwareExampleTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); - SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10); + SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 5); //set cpu requirement spout.setCPULoad(20); //set onheap and offheap memory requirement http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 97f2825..8680f20 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -44,7 +44,6 @@ (:require [metrics.meters :refer [defmeter mark!]]) (:gen-class :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]]) - (:import [org.apache.storm.container.cgroup CgroupManager]) (:require [clojure.string :as str])) (defmeter supervisor:num-workers-launched) @@ -259,7 +258,7 @@ (if (Utils/checkFileExists path) (throw (RuntimeException. (str path " was not deleted")))))) -(defn try-cleanup-worker [conf id] +(defn try-cleanup-worker [conf supervisor id] (try (if (.exists (File. (ConfigUtils/workerRoot conf id))) (do @@ -273,6 +272,8 @@ (ConfigUtils/removeWorkerUserWSE conf id) (remove-dead-worker id) )) + (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) + (.releaseResourcesForWorker (:resource-isolation-manager supervisor) id)) (catch IOException e (log-warn-error e "Failed to cleanup worker " id ". Will retry later")) (catch RuntimeException e @@ -309,9 +310,7 @@ (log-debug "Removing path " path) (.delete (File. path)) (catch Exception e))))) ;; on windows, the supervisor may still holds the lock on the worker directory - (try-cleanup-worker conf id) - (if (conf STORM-CGROUP-ENABLE) - (.shutDownWorker (:cgroup-manager supervisor) id false))) + (try-cleanup-worker conf id)) (log-message "Shut down " (:supervisor-id supervisor) ":" id)) (def SUPERVISOR-ZK-ACLS @@ -354,11 +353,11 @@ :sync-retry (atom 0) :download-lock (Object.) :stormid->profiler-actions (atom {}) - :cgroup-manager (if (conf STORM-CGROUP-ENABLE) - (let [cgroup-manager (.newInstance (Class/forName (conf STORM-RESOURCE-ISOLATION-PLUGIN)))] - (.prepare cgroup-manager conf) + :resource-isolation-manager (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) + (let [resource-isolation-manager (Utils/newInstance (conf STORM-RESOURCE-ISOLATION-PLUGIN))] + (.prepare resource-isolation-manager conf) (log-message "Using resource isolation plugin " (conf STORM-RESOURCE-ISOLATION-PLUGIN)) - cgroup-manager) + resource-isolation-manager) nil) }) @@ -384,8 +383,7 @@ (dofor [[port assignment] reassign-executors] (let [id (new-worker-ids port) storm-id (:storm-id assignment) - ^WorkerResources resources (:resources assignment) - mem-onheap (.get_mem_on_heap resources)] + ^WorkerResources resources (:resources assignment)] ;; This condition checks for required files exist before launching the worker (if (required-topo-files-exist? conf storm-id) (let [pids-path (ConfigUtils/workerPidsRoot conf id) @@ -1088,12 +1086,10 @@ (Utils/addToClasspath [stormjar]) (Utils/addToClasspath topo-classpath)) top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS) - mem-onheap (if (and (.get_mem_on_heap resources) (> (.get_mem_on_heap resources) 0)) ;; not nil and not zero - (int (Math/ceil (.get_mem_on_heap resources))) ;; round up - (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value - mem-offheap (if (.get_mem_off_heap resources) - (int (Math/ceil (.get_mem_off_heap resources))) ;; round up - 0) + + mem-onheap (int (Math/ceil (.get_mem_on_heap resources))) + + mem-offheap (int (Math/ceil (.get_mem_off_heap resources))) cpu (int (Math/ceil (.get_cpu resources))) @@ -1121,24 +1117,7 @@ storm-log4j2-conf-dir) Utils/FILE_PATH_SEPARATOR "worker.xml") - cgroup-command (if (conf STORM-CGROUP-ENABLE) - (str/split - (.startNewWorker (:cgroup-manager supervisor) worker-id - (merge - ;; The manually set CGROUP-WORKER-CPU-LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) - (cond - (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT) {"memory" (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT)} - (+ mem-onheap mem-offheap) {"memory" (+ mem-onheap mem-offheap)} - :else nil) - ;; The manually set CGROUP-WORKER-CPU-LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) - (cond - (conf STORM-WORKER-CGROUP-CPU-LIMIT) {"cpu" (conf STORM-WORKER-CGROUP-CPU-LIMIT)} - (not= cpu nil) {"cpu" cpu} - :else nil))) #" ")) - command (concat - (if (conf STORM-CGROUP-ENABLE) - cgroup-command) [(java-cmd) "-cp" classpath topo-worker-logwriter-childopts (str "-Dlogfile.name=" logfilename) @@ -1177,7 +1156,14 @@ worker-id]) command (->> command (map str) - (filter (complement empty?)))] + (filter (complement empty?))) + command (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) + (do + (.reserveResourcesForWorker (:resource-isolation-manager supervisor) worker-id + {"cpu" cpu "memory" (+ mem-onheap mem-offheap)}) + (.getLaunchCommand (:resource-isolation-manager supervisor) worker-id + (java.util.ArrayList. (java.util.Arrays/asList (to-array command))))) + command)] (log-message "Launching worker with command: " (Utils/shellCmd command)) (write-log-metadata! storm-conf user worker-id storm-id port conf) (ConfigUtils/setWorkerUserWSE conf worker-id user) http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index a5c1ea0..ebe435c 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -2196,6 +2196,9 @@ public class Config extends HashMap<String, Object> { public static final Object CLIENT_JAR_TRANSFORMER = "client.jartransformer.class"; + /** + * The plugin to be used for resource isolation + */ @isImplementationOfClass(implementsClass = ResourceIsolationInterface.class) public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; @@ -2222,10 +2225,12 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name"; /** - * flag to determine whether to use cgroups + * flag to determine whether to use a resource isolation plugin + * Also determines whether the unit tests for cgroup runs. + * If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run */ @isBoolean - public static final String STORM_CGROUP_ENABLE = "storm.cgroup.enable"; + public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = "storm.resource.isolation.plugin.enable"; /** * root directory for cgoups http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java index 8e52bc7..2db9f1b 100644 --- a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java +++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java @@ -18,6 +18,7 @@ package org.apache.storm.container; +import java.util.List; import java.util.Map; /** @@ -26,18 +27,25 @@ import java.util.Map; public interface ResourceIsolationInterface { /** + * This function should be used prior to starting the worker to reserve resources for the worker * @param workerId worker id of the worker to start * @param resources set of resources to limit - * @return a String that includes to command on how to start the worker. The string returned from this function - * will be concatenated to the front of the command to launch logwriter/worker in supervisor.clj */ - public String startNewWorker(String workerId, Map resources); + void reserveResourcesForWorker(String workerId, Map resources); /** * This function will be called when the worker needs to shutdown. This function should include logic to clean up after a worker is shutdown * @param workerId worker id to shutdown and clean up after - * @param isKilled whether to actually kill worker */ - public void shutDownWorker(String workerId, boolean isKilled); + void releaseResourcesForWorker(String workerId); + + + /** + * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used + * to get the modified command line to launch the worker with resource isolation + * @param existingCommand + * @return new commandline with necessary additions to launch worker with resource isolation + */ + List<String> getLaunchCommand(String workerId, List<String> existingCommand); } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java index f7e7f69..449eaa9 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java @@ -17,6 +17,7 @@ */ package org.apache.storm.container.cgroup; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,24 +43,18 @@ public class CgroupCenter implements CgroupOperation { } - /** - * Thread unsafe - * - * @return - */ public synchronized static CgroupCenter getInstance() { - if (instance == null) { + if (CgroupUtils.enabled()) { instance = new CgroupCenter(); + return instance; } - return CgroupUtils.enabled() ? instance : null; + return null; } @Override public List<Hierarchy> getHierarchies() { - Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>(); - - try (FileReader reader = new FileReader(Constants.MOUNT_STATUS_FILE); + try (FileReader reader = new FileReader(CgroupUtils.MOUNT_STATUS_FILE); BufferedReader br = new BufferedReader(reader)) { String str = null; while ((str = br.readLine()) != null) { @@ -69,8 +65,8 @@ public class CgroupCenter implements CgroupOperation { String name = strSplit[0]; String type = strSplit[3]; String dir = strSplit[1]; - Hierarchy h = hierarchies.get(type); - h = new Hierarchy(name, CgroupUtils.analyse(type), dir); + //Some mount options (i.e. rw and relatime) in type are not cgroups related + Hierarchy h = new Hierarchy(name, CgroupUtils.getSubSystemsFromString(type), dir); hierarchies.put(type, h); } return new ArrayList<Hierarchy>(hierarchies.values()); @@ -82,10 +78,8 @@ public class CgroupCenter implements CgroupOperation { @Override public Set<SubSystem> getSubSystems() { - Set<SubSystem> subSystems = new HashSet<SubSystem>(); - - try (FileReader reader = new FileReader(Constants.CGROUP_STATUS_FILE); + try (FileReader reader = new FileReader(CgroupUtils.CGROUP_STATUS_FILE); BufferedReader br = new BufferedReader(reader)){ String str = null; while ((str = br.readLine()) != null) { @@ -94,8 +88,10 @@ public class CgroupCenter implements CgroupOperation { if (type == null) { continue; } - subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]) - , Integer.valueOf(split[3]).intValue() == 1 ? true : false)); + int hierarchyID = Integer.valueOf(split[1]); + int cgroupNum = Integer.valueOf(split[2]); + boolean enable = Integer.valueOf(split[3]).intValue() == 1 ? true : false; + subSystems.add(new SubSystem(type, hierarchyID, cgroupNum, enable)); } return subSystems; } catch (Exception e) { @@ -105,11 +101,10 @@ public class CgroupCenter implements CgroupOperation { } @Override - public boolean enabled(SubSystemType subsystem) { - + public boolean isSubSystemEnabled(SubSystemType subSystemType) { Set<SubSystem> subSystems = this.getSubSystems(); for (SubSystem subSystem : subSystems) { - if (subSystem.getType() == subsystem) { + if (subSystem.getType() == subSystemType) { return true; } } @@ -117,25 +112,17 @@ public class CgroupCenter implements CgroupOperation { } @Override - public Hierarchy busy(SubSystemType subsystem) { - List<Hierarchy> hierarchies = this.getHierarchies(); - for (Hierarchy hierarchy : hierarchies) { - for (SubSystemType type : hierarchy.getSubSystems()) { - if (type == subsystem) { - return hierarchy; - } - } - } - return null; + public Hierarchy getHierarchyWithSubSystem(SubSystemType subSystem) { + return getHierarchyWithSubSystems(Arrays.asList(subSystem)); } @Override - public Hierarchy busy(List<SubSystemType> subSystems) { + public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems) { List<Hierarchy> hierarchies = this.getHierarchies(); for (Hierarchy hierarchy : hierarchies) { Hierarchy ret = hierarchy; - for (SubSystemType subsystem : subSystems) { - if (!hierarchy.getSubSystems().contains(subsystem)) { + for (SubSystemType subSystem : subSystems) { + if (!hierarchy.getSubSystems().contains(subSystem)) { ret = null; break; } @@ -148,85 +135,82 @@ public class CgroupCenter implements CgroupOperation { } @Override - public Hierarchy mounted(Hierarchy hierarchy) { - - List<Hierarchy> hierarchies = this.getHierarchies(); - if (CgroupUtils.dirExists(hierarchy.getDir())) { + public boolean isMounted(Hierarchy hierarchy) { + if (Utils.CheckDirExists(hierarchy.getDir())) { + List<Hierarchy> hierarchies = this.getHierarchies(); for (Hierarchy h : hierarchies) { if (h.equals(hierarchy)) { - return h; + return true; } } } - return null; + return false; } @Override public void mount(Hierarchy hierarchy) throws IOException { - - if (this.mounted(hierarchy) != null) { - LOG.error("{} is mounted", hierarchy.getDir()); + if (this.isMounted(hierarchy)) { + LOG.error("{} is already mounted", hierarchy.getDir()); return; } - Set<SubSystemType> subsystems = hierarchy.getSubSystems(); - for (SubSystemType type : subsystems) { - if (this.busy(type) != null) { - LOG.error("subsystem: {} is busy", type.name()); - subsystems.remove(type); + Set<SubSystemType> subSystems = hierarchy.getSubSystems(); + for (SubSystemType type : subSystems) { + Hierarchy hierarchyWithSubSystem = this.getHierarchyWithSubSystem(type); + if (hierarchyWithSubSystem != null) { + LOG.error("subSystem: {} is already mounted on hierarchy: {}", type.name(), hierarchyWithSubSystem); + subSystems.remove(type); } } - if (subsystems.size() == 0) { + if (subSystems.size() == 0) { return; } - if (!CgroupUtils.dirExists(hierarchy.getDir())) { + if (!Utils.CheckDirExists(hierarchy.getDir())) { new File(hierarchy.getDir()).mkdirs(); } - String subSystems = CgroupUtils.reAnalyse(subsystems); - SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); + String subSystemsName = CgroupUtils.subSystemsToString(subSystems); + SystemOperation.mount(subSystemsName, hierarchy.getDir(), "cgroup", subSystemsName); } @Override public void umount(Hierarchy hierarchy) throws IOException { - if (this.mounted(hierarchy) != null) { + if (this.isMounted(hierarchy)) { hierarchy.getRootCgroups().delete(); SystemOperation.umount(hierarchy.getDir()); CgroupUtils.deleteDir(hierarchy.getDir()); + } else { + LOG.error("{} is not mounted", hierarchy.getDir()); } } @Override - public void create(CgroupCommon cgroup) throws SecurityException { + public void createCgroup(CgroupCommon cgroup) throws SecurityException { if (cgroup.isRoot()) { LOG.error("You can't create rootCgroup in this function"); - return; + throw new RuntimeException("You can't create rootCgroup in this function"); } CgroupCommon parent = cgroup.getParent(); while (parent != null) { - if (!CgroupUtils.dirExists(parent.getDir())) { - LOG.error(" {} is not existed", parent.getDir()); - return; + if (!Utils.CheckDirExists(parent.getDir())) { + throw new RuntimeException("Parent " + parent.getDir() + "does not exist"); } parent = parent.getParent(); } Hierarchy h = cgroup.getHierarchy(); - if (mounted(h) == null) { - LOG.error("{} is not mounted", h.getDir()); - return; + if (!isMounted(h)) { + throw new RuntimeException("hierarchy " + h.getDir() + " is not mounted"); } - if (CgroupUtils.dirExists(cgroup.getDir())) { - LOG.error("{} is existed", cgroup.getDir()); - return; + if (Utils.CheckDirExists(cgroup.getDir())) { + throw new RuntimeException("cgroup {} already exists " + cgroup.getDir()); } - //Todo perhaps thrown exception or print out error message is dir is not created successfully if (!(new File(cgroup.getDir())).mkdir()) { - LOG.error("Could not create cgroup dir at {}", cgroup.getDir()); + throw new RuntimeException("Could not create cgroup dir at " + cgroup.getDir()); } } @Override - public void delete(CgroupCommon cgroup) throws IOException { + public void deleteCgroup(CgroupCommon cgroup) throws IOException { cgroup.delete(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java index fbf96ba..b12fcc0 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java @@ -45,12 +45,8 @@ public class CgroupCommon implements CgroupCommonOperation { private final CgroupCommon parent; - private final Map<SubSystemType, CgroupCore> cores; - private final boolean isRoot; - private final Set<CgroupCommon> children = new HashSet<CgroupCommon>(); - private static final Logger LOG = LoggerFactory.getLogger(CgroupCommon.class); public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) { @@ -58,8 +54,6 @@ public class CgroupCommon implements CgroupCommonOperation { this.hierarchy = hierarchy; this.parent = parent; this.dir = parent.getDir() + "/" + name; - this.init(); - cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); this.isRoot = false; } @@ -71,19 +65,17 @@ public class CgroupCommon implements CgroupCommonOperation { this.hierarchy = hierarchy; this.parent = null; this.dir = dir; - this.init(); - cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); this.isRoot = true; } @Override public void addTask(int taskId) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, TASKS), String.valueOf(taskId)); } @Override public Set<Integer> getTasks() throws IOException { - List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); + List<String> stringTasks = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, TASKS)); Set<Integer> tasks = new HashSet<Integer>(); for (String task : stringTasks) { tasks.add(Integer.valueOf(task)); @@ -93,12 +85,12 @@ public class CgroupCommon implements CgroupCommonOperation { @Override public void addProcs(int pid) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); } @Override public Set<Integer> getPids() throws IOException { - List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS)); + List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS)); Set<Integer> pids = new HashSet<Integer>(); for (String task : stringPids) { pids.add(Integer.valueOf(task)); @@ -109,41 +101,43 @@ public class CgroupCommon implements CgroupCommonOperation { @Override public void setNotifyOnRelease(boolean flag) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); } @Override public boolean getNotifyOnRelease() throws IOException { - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; + return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; } @Override public void setReleaseAgent(String command) throws IOException { if (!this.isRoot) { + LOG.warn("Cannot set {} in {} since its not the root group", RELEASE_AGENT, this.isRoot); return; } - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT), command); } @Override public String getReleaseAgent() throws IOException { if (!this.isRoot) { + LOG.warn("Cannot get {} in {} since its not the root group", RELEASE_AGENT, this.isRoot); return null; } - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0); + return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT)).get(0); } @Override public void setCgroupCloneChildren(boolean flag) throws IOException { - if (!this.cores.keySet().contains(SubSystemType.cpuset)) { + if (!getCores().keySet().contains(SubSystemType.cpuset)) { return; } - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); } @Override public boolean getCgroupCloneChildren() throws IOException { - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; + return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; } @Override @@ -156,7 +150,7 @@ public class CgroupCommon implements CgroupCommonOperation { sb.append(' '); sb.append(arg); } - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); } public Hierarchy getHierarchy() { @@ -176,6 +170,19 @@ public class CgroupCommon implements CgroupCommonOperation { } public Set<CgroupCommon> getChildren() { + + File file = new File(this.dir); + File[] files = file.listFiles(); + if (files == null) { + LOG.info("{} is not a directory", this.dir); + return null; + } + Set<CgroupCommon> children = new HashSet<CgroupCommon>(); + for (File child : files) { + if (child.isDirectory()) { + children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); + } + } return children; } @@ -184,7 +191,7 @@ public class CgroupCommon implements CgroupCommonOperation { } public Map<SubSystemType, CgroupCore> getCores() { - return cores; + return CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); } public void delete() throws IOException { @@ -195,7 +202,7 @@ public class CgroupCommon implements CgroupCommonOperation { } private void free() throws IOException { - for (CgroupCommon child : this.children) { + for (CgroupCommon child : getChildren()) { child.free(); } if (this.isRoot) { @@ -210,17 +217,54 @@ public class CgroupCommon implements CgroupCommonOperation { CgroupUtils.deleteDir(this.dir); } - private void init() { - File file = new File(this.dir); - File[] files = file.listFiles(); - if (files == null) { - return; - } - for (File child : files) { - if (child.isDirectory()) { - this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); + @Override + public boolean equals(Object o) { + boolean ret = false; + if (o != null && (o instanceof CgroupCommon)) { + + boolean hierarchyFlag =false; + if (((CgroupCommon)o).hierarchy != null && this.hierarchy != null) { + hierarchyFlag = ((CgroupCommon)o).hierarchy.equals(this.hierarchy); + } else if (((CgroupCommon)o).hierarchy == null && this.hierarchy == null) { + hierarchyFlag = true; + } else { + hierarchyFlag = false; + } + + boolean nameFlag = false; + if (((CgroupCommon)o).name != null && this.name != null) { + nameFlag = ((CgroupCommon)o).name.equals(this.name); + } else if (((CgroupCommon)o).name == null && this.name == null) { + nameFlag = true; + } else { + nameFlag = false; } + + boolean dirFlag = false; + if (((CgroupCommon)o).dir != null && this.dir != null) { + dirFlag = ((CgroupCommon)o).dir.equals(this.dir); + } else if (((CgroupCommon)o).dir == null && this.dir == null) { + dirFlag = true; + } else { + dirFlag = false; + } + ret = hierarchyFlag && nameFlag && dirFlag; } + return ret; } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (this.name != null ? this.name.hashCode() : 0); + result = prime * result + (this.hierarchy != null ? this.hierarchy.hashCode() : 0); + result = prime * result + (this.dir != null ? this.dir.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return this.getName(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java index f6b4ece..54368b6 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java @@ -78,5 +78,4 @@ public interface CgroupCommonOperation { * set event control config */ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; - } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java index 98aedcf..53a8a7f 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java @@ -71,5 +71,4 @@ public class CgroupCoreFactory { } return result; } - } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java index a3dbd9d..8b775be 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java @@ -18,6 +18,7 @@ package org.apache.storm.container.cgroup; +import org.apache.commons.lang.ArrayUtils; import org.apache.storm.Config; import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.container.cgroup.core.CpuCore; @@ -28,6 +29,8 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -35,6 +38,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +/** + * Class that implements ResourceIsolationInterface that manages cgroups + */ public class CgroupManager implements ResourceIsolationInterface { private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class); @@ -49,16 +55,20 @@ public class CgroupManager implements ResourceIsolationInterface { private Map conf; + /** + * initialize intial data structures + * @param conf storm confs + */ public void prepare(Map conf) throws IOException { this.conf = conf; this.rootDir = Config.getCgroupRootDir(this.conf); if (this.rootDir == null) { - throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing."); + throw new RuntimeException("Check configuration file. The storm.supervisor.cgroup.rootdir is missing."); } File file = new File(Config.getCgroupStormHierarchyDir(conf) + "/" + this.rootDir); if (!file.exists()) { - LOG.error("{}/{} is not existing.", Config.getCgroupStormHierarchyDir(conf), this.rootDir); + LOG.error("{} is not existing.", file.getPath()); throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file."); } this.center = CgroupCenter.getInstance(); @@ -69,6 +79,30 @@ public class CgroupManager implements ResourceIsolationInterface { } /** + * initalize subsystems + */ + private void prepareSubSystem(Map conf) throws IOException { + List<SubSystemType> subSystemTypes = new LinkedList<>(); + for (String resource : Config.getCgroupStormResources(conf)) { + subSystemTypes.add(SubSystemType.getSubSystem(resource)); + } + + this.hierarchy = center.getHierarchyWithSubSystems(subSystemTypes); + + if (this.hierarchy == null) { + Set<SubSystemType> types = new HashSet<SubSystemType>(); + types.add(SubSystemType.cpu); + this.hierarchy = new Hierarchy(Config.getCgroupStormHierarchyName(conf), types, Config.getCgroupStormHierarchyDir(conf)); + } + this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups()); + + // set upper limit to how much cpu can be used by all workers running on supervisor node. + // This is done so that some cpu cycles will remain free to run the daemons and other miscellaneous OS operations. + CpuCore supervisorRootCPU = (CpuCore) this.rootCgroup.getCores().get(SubSystemType.cpu); + setCpuUsageUpperLimit(supervisorRootCPU, ((Number) this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue()); + } + + /** * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. * If making a process to fully use two cpu cores, set cfs_period_us to * 100000 and set cfs_quota_us to 200000 @@ -84,22 +118,36 @@ public class CgroupManager implements ResourceIsolationInterface { } } - public String startNewWorker(String workerId, Map resourcesMap) throws SecurityException { - Number cpuNum = (Number) resourcesMap.get("cpu"); + public void reserveResourcesForWorker(String workerId, Map resourcesMap) throws SecurityException { + Number cpuNum = null; + // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) { + cpuNum = (Number) this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT); + } else if(resourcesMap.get("cpu") != null) { + cpuNum = (Number) resourcesMap.get("cpu"); + } + Number totalMem = null; - if (resourcesMap.get("memory") != null) { + // The manually set STORM_WORKER_CGROUP_MEMORY_MB_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + if (this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT) != null) { + totalMem = (Number) this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT); + } else if (resourcesMap.get("memory") != null) { totalMem = (Number) resourcesMap.get("memory"); } - CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup); - this.center.create(workerGroup); + CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup); + try { + this.center.createCgroup(workerGroup); + } catch (Exception e) { + LOG.error("Error when creating Cgroup: {}", e); + } if (cpuNum != null) { CpuCore cpuCore = (CpuCore) workerGroup.getCores().get(SubSystemType.cpu); try { cpuCore.setCpuShares(cpuNum.intValue()); } catch (IOException e) { - throw new RuntimeException("Cannot set cpu.shares! Exception: " + e); + throw new RuntimeException("Cannot set cpu.shares! Exception: ", e); } } @@ -108,70 +156,55 @@ public class CgroupManager implements ResourceIsolationInterface { try { memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024)); } catch (IOException e) { - throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: " + e); - } - } - - StringBuilder sb = new StringBuilder(); - - sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g "); - - Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator(); - while(it.hasNext()) { - sb.append(it.next().toString()); - if(it.hasNext()) { - sb.append(","); - } else { - sb.append(":"); + throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e); } } - - sb.append(workerGroup.getName()); - - return sb.toString(); } - public void shutDownWorker(String workerId, boolean isKilled) { + public void releaseResourcesForWorker(String workerId) { CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup); try { - if (isKilled == false) { - for (Integer pid : workerGroup.getTasks()) { - Utils.kill(pid); - } - Utils.sleepMs(1500); - } Set<Integer> tasks = workerGroup.getTasks(); - if (isKilled == true && !tasks.isEmpty()) { + if (!tasks.isEmpty()) { throw new Exception("Cannot correctly showdown worker CGroup " + workerId + "tasks " + tasks.toString() + " still running!"); } - this.center.delete(workerGroup); + this.center.deleteCgroup(workerGroup); } catch (Exception e) { LOG.error("Exception thrown when shutting worker {} Exception: {}", workerId, e); } } - public void close() throws IOException { - this.center.delete(this.rootCgroup); - } + @Override + public List<String> getLaunchCommand(String workerId, List<String> existingCommand) { - private void prepareSubSystem(Map conf) throws IOException { - List<SubSystemType> subSystemTypes = new LinkedList<>(); - for (String resource : Config.getCgroupStormResources(conf)) { - subSystemTypes.add(SubSystemType.getSubSystem(resource)); + CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup); + + if(!this.rootCgroup.getChildren().contains(workerGroup)) { + LOG.error("cgroup {} doesn't exist! Need to reserve resources for worker first!", workerGroup); + return existingCommand; } - this.hierarchy = center.busy(subSystemTypes); + StringBuilder sb = new StringBuilder(); - if (this.hierarchy == null) { - Set<SubSystemType> types = new HashSet<SubSystemType>(); - types.add(SubSystemType.cpu); - this.hierarchy = new Hierarchy(Config.getCgroupStormHierarchyName(conf), types, Config.getCgroupStormHierarchyDir(conf)); + sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g "); + + Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator(); + while(it.hasNext()) { + sb.append(it.next().toString()); + if(it.hasNext()) { + sb.append(","); + } else { + sb.append(":"); + } } - this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups()); + sb.append(workerGroup.getName()); + List<String> newCommand = new ArrayList<String>(); + newCommand.addAll(Arrays.asList(sb.toString().split(" "))); + newCommand.addAll(existingCommand); + return newCommand; + } - // set upper limit to how much cpu can be used by all workers running on supervisor node. - // This is done so that some cpu cycles will remain free to run the daemons and other miscellaneous OS operations. - CpuCore supervisorRootCPU = (CpuCore) this.rootCgroup.getCores().get(SubSystemType.cpu); - setCpuUsageUpperLimit(supervisorRootCPU, ((Number) this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue()); + public void close() throws IOException { + this.center.deleteCgroup(this.rootCgroup); } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java index aa315ba..3626d04 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java @@ -21,26 +21,58 @@ import java.io.IOException; import java.util.List; import java.util.Set; +/** + * An interface to manage cgroups + */ public interface CgroupOperation { + /** + * Get a list of hierarchies + */ public List<Hierarchy> getHierarchies(); + /** + * get a list of available subsystems + */ public Set<SubSystem> getSubSystems(); - public boolean enabled(SubSystemType subsystem); + /** + * Check if a subsystem is enabled + */ + public boolean isSubSystemEnabled(SubSystemType subsystem); - public Hierarchy busy(SubSystemType subsystem); + /** + * get the first hierarchy that has a certain subsystem isMounted + */ + public Hierarchy getHierarchyWithSubSystem(SubSystemType subsystem); - public Hierarchy busy(List<SubSystemType> subSystems); + /** + * get the first hierarchy that has a certain list of subsystems isMounted + */ + public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems); - public Hierarchy mounted(Hierarchy hierarchy); + /** + * check if a hiearchy is mounted + */ + public boolean isMounted(Hierarchy hierarchy); + /** + * mount a hierarchy + */ public void mount(Hierarchy hierarchy) throws IOException; + /** + * umount a heirarchy + */ public void umount(Hierarchy hierarchy) throws IOException; - public void create(CgroupCommon cgroup) throws SecurityException; - - public void delete(CgroupCommon cgroup) throws IOException; + /** + * create a cgroup + */ + public void createCgroup(CgroupCommon cgroup) throws SecurityException; + /** + * delete a cgroup + */ + public void deleteCgroup(CgroupCommon cgroup) throws IOException; } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java index 7c88f5d..c41b491 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java @@ -17,22 +17,26 @@ */ package org.apache.storm.container.cgroup; +import com.google.common.io.Files; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; -import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; -import java.util.ArrayList; +import java.nio.charset.Charset; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; public class CgroupUtils { + public static final String CGROUP_STATUS_FILE = "/proc/cgroups"; + public static final String MOUNT_STATUS_FILE = "/proc/mounts"; + private static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class); public static void deleteDir(String dir) { @@ -50,20 +54,14 @@ public class CgroupUtils { } } - public static boolean fileExists(String dir) { - File file = new File(dir); - return file.exists(); - } - - public static boolean dirExists(String dir) { - File file = new File(dir); - return file.isDirectory(); - } - - public static Set<SubSystemType> analyse(String str) { + /** + * Get a set of SubSystemType objects from a comma delimited list of subsystem names + */ + public static Set<SubSystemType> getSubSystemsFromString(String str) { Set<SubSystemType> result = new HashSet<SubSystemType>(); String[] subSystems = str.split(","); for (String subSystem : subSystems) { + //return null to mount options in string that is not part of cgroups SubSystemType type = SubSystemType.getSubSystem(subSystem); if (type != null) { result.add(type); @@ -72,7 +70,10 @@ public class CgroupUtils { return result; } - public static String reAnalyse(Set<SubSystemType> subSystems) { + /** + * Get a string that is a comma delimited list of subsystems + */ + public static String subSystemsToString(Set<SubSystemType> subSystems) { StringBuilder sb = new StringBuilder(); if (subSystems.size() == 0) { return sb.toString(); @@ -84,31 +85,23 @@ public class CgroupUtils { } public static boolean enabled() { - return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE); + return Utils.checkFileExists(CGROUP_STATUS_FILE); } - public static List<String> readFileByLine(String fileDir) throws IOException { - List<String> result = new ArrayList<String>(); - File file = new File(fileDir); - try (FileReader fileReader = new FileReader(file); - BufferedReader reader = new BufferedReader(fileReader)) { - String tempString = null; - while ((tempString = reader.readLine()) != null) { - result.add(tempString); - } - } - return result; + public static List<String> readFileByLine(String filePath) throws IOException { + return Files.readLines(new File(filePath), Charset.defaultCharset()); } - public static void writeFileByLine(String fileDir, List<String> strings) throws IOException { - File file = new File(fileDir); + public static void writeFileByLine(String filePath, List<String> linesToWrite) throws IOException { + LOG.debug("For CGroups - writing {} to {} ", linesToWrite, filePath); + File file = new File(filePath); if (!file.exists()) { - LOG.error("{} is no existed", fileDir); + LOG.error("{} does not exist", filePath); return; } try (FileWriter writer = new FileWriter(file, true); BufferedWriter bw = new BufferedWriter(writer)) { - for (String string : strings) { + for (String string : linesToWrite) { bw.write(string); bw.newLine(); bw.flush(); @@ -116,18 +109,11 @@ public class CgroupUtils { } } - public static void writeFileByLine(String fileDir, String string) throws IOException { - LOG.debug("For CGroups - writing {} to {} ", string, fileDir); - File file = new File(fileDir); - if (!file.exists()) { - LOG.error("{} is no existed", fileDir); - return; - } - try (FileWriter writer = new FileWriter(file, true); - BufferedWriter bw = new BufferedWriter(writer)) { - bw.write(string); - bw.newLine(); - bw.flush(); - } + public static void writeFileByLine(String filePath, String lineToWrite) throws IOException { + writeFileByLine(filePath, Arrays.asList(lineToWrite)); + } + + public static String getDir(String dir, String constant) { + return dir + constant; } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java deleted file mode 100755 index 0ce9643..0000000 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.container.cgroup; - -public class Constants { - - public static final String CGROUP_STATUS_FILE = "/proc/cgroups"; - - public static final String MOUNT_STATUS_FILE = "/proc/mounts"; - - public static String getDir(String dir, String constant) { - return dir + constant; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java index 26def4c..57eb8ff 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java @@ -17,6 +17,9 @@ */ package org.apache.storm.container.cgroup; +/** + * a class that represents a device in linux + */ public class Device { public final int major; http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java index 16df384..440531a 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java @@ -19,6 +19,9 @@ package org.apache.storm.container.cgroup; import java.util.Set; +/** + * A class that describes a cgroup hiearchy + */ public class Hierarchy { private final String name; @@ -36,13 +39,19 @@ public class Hierarchy { this.subSystems = subSystems; this.dir = dir; this.rootCgroups = new CgroupCommon(this, dir); - this.type = CgroupUtils.reAnalyse(subSystems); + this.type = CgroupUtils.subSystemsToString(subSystems); } + /** + * get subsystems + */ public Set<SubSystemType> getSubSystems() { return subSystems; } + /** + * get all subsystems in hierarchy as a comma delimited list + */ public String getType() { return type; } @@ -105,7 +114,7 @@ public class Hierarchy { return name; } - public boolean subSystemMounted(SubSystemType subsystem) { + public boolean isSubSystemMounted(SubSystemType subsystem) { for (SubSystemType type : this.subSystems) { if (type == subsystem) { return true; @@ -114,4 +123,8 @@ public class Hierarchy { return false; } + @Override + public String toString() { + return this.dir; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java index ac62e61..e354fb0 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java @@ -17,6 +17,9 @@ */ package org.apache.storm.container.cgroup; +/** + * a class that implements operations that can be performed on a cgroup subsystem + */ public class SubSystem { private SubSystemType type; @@ -70,9 +73,9 @@ public class SubSystem { public boolean equals(Object object) { boolean ret = false; if (object != null && object instanceof SubSystem) { - ret = (this.type.equals(((SubSystem)object).getType()) && this.hierarchyID == ((SubSystem)object).getHierarchyID()); + ret = ((this.type == ((SubSystem)object).getType()) + && (this.hierarchyID == ((SubSystem)object).getHierarchyID())); } return ret; } - } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java index 3c6c020..914abcc 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java @@ -17,42 +17,20 @@ */ package org.apache.storm.container.cgroup; +/** + * A enum class to described the subsystems that can be used + */ public enum SubSystemType { - // net_cls,ns is not supposted in ubuntu + // net_cls,ns is not supported in ubuntu blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio; + public static SubSystemType getSubSystem(String str) { - if (str.equals("blkio")) { - return blkio; - } - else if (str.equals("cpu")) { - return cpu; - } - else if (str.equals("cpuacct")) { - return cpuacct; - } - else if (str.equals("cpuset")) { - return cpuset; - } - else if (str.equals("devices")) { - return devices; - } - else if (str.equals("freezer")) { - return freezer; - } - else if (str.equals("memory")) { - return memory; - } - else if (str.equals("perf_event")) { - return perf_event; - } - else if (str.equals("net_cls")) { - return net_cls; - } - else if (str.equals("net_prio")) { - return net_prio; + try { + return SubSystemType.valueOf(str); + } catch (Exception e) { + return null; } - return null; } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java index ee3517a..6872b4a 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java @@ -24,6 +24,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +/** + * A class that implements system operations for using cgroups + */ public class SystemOperation { private static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); @@ -31,17 +34,24 @@ public class SystemOperation { public static boolean isRoot() throws IOException { String result = SystemOperation.exec("echo $EUID").substring(0, 1); return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false; - }; + } - public static void mount(String name, String target, String type, String data) throws IOException { + public static void mount(String name, String target, String type, String options) throws IOException { StringBuilder sb = new StringBuilder(); - sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); + sb.append("mount -t ") + .append(type) + .append(" -o ") + .append(options) + .append(" ") + .append(name) + .append(" ") + .append(target); SystemOperation.exec(sb.toString()); } - public static void umount(String name) throws IOException { + public static void umount(String pathToDir) throws IOException { StringBuilder sb = new StringBuilder(); - sb.append("umount ").append(name); + sb.append("umount ").append(pathToDir); SystemOperation.exec(sb.toString()); } @@ -59,7 +69,7 @@ public class SystemOperation { } return output; } catch (InterruptedException ie) { - throw new IOException(ie.toString()); + throw new IOException(ie); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java index 5522601..c426610 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java @@ -18,7 +18,6 @@ package org.apache.storm.container.cgroup.core; import org.apache.storm.container.cgroup.CgroupUtils; -import org.apache.storm.container.cgroup.Constants; import org.apache.storm.container.cgroup.SubSystemType; import org.apache.storm.container.cgroup.Device; @@ -63,19 +62,19 @@ public class BlkioCore implements CgroupCore { /* weight: 100-1000 */ public void setBlkioWeight(int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight)); } public int getBlkioWeight() throws IOException { - return Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue(); + return Integer.valueOf(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue(); } public void setBlkioWeightDevice(Device device, int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight)); } public Map<Device, Integer> getBlkioWeightDevice() throws IOException { - List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE)); + List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE)); Map<Device, Integer> result = new HashMap<Device, Integer>(); for (String string : strings) { String[] strArgs = string.split(" "); @@ -87,123 +86,79 @@ public class BlkioCore implements CgroupCore { } public void setReadBps(Device device, long bps) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps)); } public Map<Device, Long> getReadBps() throws IOException { - List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE)); - Map<Device, Long> result = new HashMap<Device, Long>(); - for (String string : strings) { - String[] strArgs = string.split(" "); - Device device = new Device(strArgs[0]); - Long bps = Long.valueOf(strArgs[1]); - result.put(device, bps); - } - return result; + return parseConfig(BLKIO_THROTTLE_READ_BPS_DEVICE); } public void setWriteBps(Device device, long bps) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps)); } public Map<Device, Long> getWriteBps() throws IOException { - List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE)); - Map<Device, Long> result = new HashMap<Device, Long>(); - for (String string : strings) { - String[] strArgs = string.split(" "); - Device device = new Device(strArgs[0]); - Long bps = Long.valueOf(strArgs[1]); - result.put(device, bps); - } - return result; + return parseConfig(BLKIO_THROTTLE_WRITE_BPS_DEVICE); } public void setReadIOps(Device device, long iops) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops)); } public Map<Device, Long> getReadIOps() throws IOException { - List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE)); - Map<Device, Long> result = new HashMap<Device, Long>(); - for (String string : strings) { - String[] strArgs = string.split(" "); - Device device = new Device(strArgs[0]); - Long iops = Long.valueOf(strArgs[1]); - result.put(device, iops); - } - return result; + return parseConfig(BLKIO_THROTTLE_READ_IOPS_DEVICE); } public void setWriteIOps(Device device, long iops) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops)); } public Map<Device, Long> getWriteIOps() throws IOException { - List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE)); - Map<Device, Long> result = new HashMap<Device, Long>(); - for (String string : strings) { - String[] strArgs = string.split(" "); - Device device = new Device(strArgs[0]); - Long iops = Long.valueOf(strArgs[1]); - result.put(device, iops); - } - return result; + return parseConfig(BLKIO_THROTTLE_WRITE_IOPS_DEVICE); } public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED))); } public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES))); } public Map<Device, Long> getBlkioTime() throws IOException { - Map<Device, Long> result = new HashMap<Device, Long>(); - List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME)); - for (String str : strs) { - String[] strArgs = str.split(" "); - result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); - } - return result; + return parseConfig(BLKIO_TIME); } public Map<Device, Long> getBlkioSectors() throws IOException { - Map<Device, Long> result = new HashMap<Device, Long>(); - List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS)); - for (String str : strs) { - String[] strArgs = str.split(" "); - result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); - } - return result; + return parseConfig(BLKIO_SECTORS); } public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICED))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICED))); } public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_BYTES))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_BYTES))); } public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_TIME))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_TIME))); } public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_WAIT_TIME))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_WAIT_TIME))); } public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_MERGED))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_MERGED))); } public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException { - return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_QUEUED))); + return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_QUEUED))); } public void resetStats() throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_RESET_STATS), "1"); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_RESET_STATS), "1"); } private String makeContext(Device device, Object data) { @@ -212,6 +167,18 @@ public class BlkioCore implements CgroupCore { return sb.toString(); } + private Map<Device, Long> parseConfig(String config) throws IOException { + List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, config)); + Map<Device, Long> result = new HashMap<Device, Long>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Long value = Long.valueOf(strArgs[1]); + result.put(device, value); + } + return result; + } + private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) { Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>(); for (String str : strs) { @@ -236,22 +203,9 @@ public class BlkioCore implements CgroupCore { read, write, sync, async, total; public static RecordType getType(String type) { - if (type.equals("Read")) { - return read; - } - else if (type.equals("Write")) { - return write; - } - else if (type.equals("Sync")) { - return sync; - } - else if (type.equals("Async")) { - return async; - } - else if (type.equals("Total")) { - return total; - } - else { + try { + return RecordType.valueOf(type.toLowerCase()); + } catch (Exception e) { return null; } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java index 054ec0d..1d21251 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java @@ -18,7 +18,6 @@ package org.apache.storm.container.cgroup.core; import org.apache.storm.container.cgroup.CgroupUtils; -import org.apache.storm.container.cgroup.Constants; import org.apache.storm.container.cgroup.SubSystemType; import java.io.IOException; @@ -45,47 +44,47 @@ public class CpuCore implements CgroupCore { } public void setCpuShares(int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); } public int getCpuShares() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0)); + return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES)).get(0)); } public void setCpuRtRuntimeUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us)); } public long getCpuRtRuntimeUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0)); } public void setCpuRtPeriodUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us)); } public Long getCpuRtPeriodUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US)).get(0)); } public void setCpuCfsPeriodUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); } public Long getCpuCfsPeriodUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0)); } public void setCpuCfsQuotaUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); + CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); } public Long getCpuCfsQuotaUs() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0)); } public Stat getCpuStat() throws IOException { - return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_STAT))); + return new Stat(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_STAT))); } public static class Stat { http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java index 56ae2dc..2e683f4 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java @@ -18,7 +18,6 @@ package org.apache.storm.container.cgroup.core; import org.apache.storm.container.cgroup.CgroupUtils; -import org.apache.storm.container.cgroup.Constants; import org.apache.storm.container.cgroup.SubSystemType; import java.io.IOException; @@ -44,11 +43,11 @@ public class CpuacctCore implements CgroupCore { } public Long getCpuUsage() throws IOException { - return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE)).get(0)); + return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE)).get(0)); } public Map<StatType, Long> getCpuStat() throws IOException { - List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT)); + List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_STAT)); Map<StatType, Long> result = new HashMap<StatType, Long>(); result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1])); result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1])); @@ -56,7 +55,7 @@ public class CpuacctCore implements CgroupCore { } public Long[] getPerCpuUsage() throws IOException { - String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0); + String str = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0); String[] strArgs = str.split(" "); Long[] result = new Long[strArgs.length]; for (int i = 0; i < result.length; i++) { @@ -65,7 +64,7 @@ public class CpuacctCore implements CgroupCore { return result; } - public enum StatType { + public static enum StatType { user, system; }
