another round of changes

edits based on comments for zhuoliu and abhishekagarwal87


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9421cd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9421cd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9421cd8

Branch: refs/heads/master
Commit: c9421cd8b712aae7d0aa3dda986de8920c08fe54
Parents: fc063ec
Author: Boyang Jerry Peng <[email protected]>
Authored: Fri Feb 12 10:44:27 2016 -0600
Committer: Boyang Jerry Peng <[email protected]>
Committed: Fri Feb 12 10:44:27 2016 -0600

----------------------------------------------------------------------
 conf/cgconfig.conf.example                      |   2 +-
 conf/defaults.yaml                              |  10 +-
 .../starter/ResourceAwareExampleTopology.java   |   2 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  56 +++-----
 storm-core/src/jvm/org/apache/storm/Config.java |   9 +-
 .../container/ResourceIsolationInterface.java   |  18 ++-
 .../storm/container/cgroup/CgroupCenter.java    | 116 +++++++---------
 .../storm/container/cgroup/CgroupCommon.java    | 106 +++++++++-----
 .../container/cgroup/CgroupCommonOperation.java |   1 -
 .../container/cgroup/CgroupCoreFactory.java     |   1 -
 .../storm/container/cgroup/CgroupManager.java   | 139 ++++++++++++-------
 .../storm/container/cgroup/CgroupOperation.java |  46 +++++-
 .../storm/container/cgroup/CgroupUtils.java     |  74 ++++------
 .../storm/container/cgroup/Constants.java       |  30 ----
 .../apache/storm/container/cgroup/Device.java   |   3 +
 .../storm/container/cgroup/Hierarchy.java       |  17 ++-
 .../storm/container/cgroup/SubSystem.java       |   7 +-
 .../storm/container/cgroup/SubSystemType.java   |  40 ++----
 .../storm/container/cgroup/SystemOperation.java |  24 +++-
 .../storm/container/cgroup/core/BlkioCore.java  | 122 +++++-----------
 .../storm/container/cgroup/core/CpuCore.java    |  23 ++-
 .../container/cgroup/core/CpuacctCore.java      |   9 +-
 .../storm/container/cgroup/core/CpusetCore.java |  57 ++++----
 .../container/cgroup/core/DevicesCore.java      |  37 ++---
 .../container/cgroup/core/FreezerCore.java      |   5 +-
 .../storm/container/cgroup/core/MemoryCore.java |  37 +++--
 .../storm/container/cgroup/core/NetClsCore.java |   5 +-
 .../container/cgroup/core/NetPrioCore.java      |   7 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   7 +-
 .../clj/org/apache/storm/supervisor_test.clj    |   2 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  |  24 +++-
 .../resource/TestResourceAwareScheduler.java    |   3 +
 32 files changed, 530 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/conf/cgconfig.conf.example
----------------------------------------------------------------------
diff --git a/conf/cgconfig.conf.example b/conf/cgconfig.conf.example
index 555b83a..70ac495 100644
--- a/conf/cgconfig.conf.example
+++ b/conf/cgconfig.conf.example
@@ -38,4 +38,4 @@ group storm {
        }
        cpu {
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e32e6f7..b88d478 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -156,7 +156,7 @@ supervisor.heartbeat.frequency.secs: 5
 supervisor.enable: true
 supervisor.supervisors: []
 supervisor.supervisors.commands: []
-supervisor.memory.capacity.mb: 3072.0
+supervisor.memory.capacity.mb: 4096.0
 #By convention 1 cpu core should be about 100, but this can be adjusted if 
needed
 # using 100 makes it simple to set the desired value to the capacity 
measurement
 # for single threaded bolts
@@ -263,7 +263,7 @@ topology.state.checkpoint.interval.ms: 1000
 # topology priority describing the importance of the topology in decreasing 
importance starting from 0 (i.e. 0 is the highest priority and the priority 
importance decreases as the priority number increases).
 # Recommended range of 0-29 but no hard limit set.
 topology.priority: 29
-topology.component.resources.onheap.memory.mb: 128.0
+topology.component.resources.onheap.memory.mb: 256.0
 topology.component.resources.offheap.memory.mb: 0.0
 topology.component.cpu.pcore.percent: 10.0
 topology.worker.max.heap.size.mb: 768.0
@@ -287,14 +287,14 @@ storm.daemon.metrics.reporter.plugins:
      - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
 
 storm.resource.isolation.plugin: 
"org.apache.storm.container.cgroup.CgroupManager"
+storm.resource.isolation.plugin.enable: false
 
 # Configs for CGroup support
 storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
 storm.cgroup.resources:
-    - cpu
-    - memory
+    - "cpu"
+    - "memory"
 storm.cgroup.hierarchy.name: "storm"
 # Also determines whether the unit tests for cgroup runs.  If cgroup.enable is 
set to false the unit tests for cgroups will not run
-storm.cgroup.enable: false
 storm.supervisor.cgroup.rootdir: "storm"
 storm.cgroup.cgexec.cmd: "/bin/cgexec"

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
index d4aa304..19efbc5 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -59,7 +59,7 @@ public class ResourceAwareExampleTopology {
   public static void main(String[] args) throws Exception {
     TopologyBuilder builder = new TopologyBuilder();
 
-    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 10);
+    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 5);
     //set cpu requirement
     spout.setCPULoad(20);
     //set onheap and offheap memory requirement

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 97f2825..8680f20 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -44,7 +44,6 @@
   (:require [metrics.meters :refer [defmeter mark!]])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] 
void]])
-  (:import [org.apache.storm.container.cgroup CgroupManager])
   (:require [clojure.string :as str]))
 
 (defmeter supervisor:num-workers-launched)
@@ -259,7 +258,7 @@
     (if (Utils/checkFileExists path)
       (throw (RuntimeException. (str path " was not deleted"))))))
 
-(defn try-cleanup-worker [conf id]
+(defn try-cleanup-worker [conf supervisor id]
   (try
     (if (.exists (File. (ConfigUtils/workerRoot conf id)))
       (do
@@ -273,6 +272,8 @@
         (ConfigUtils/removeWorkerUserWSE conf id)
         (remove-dead-worker id)
       ))
+    (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE)
+      (.releaseResourcesForWorker (:resource-isolation-manager supervisor) id))
   (catch IOException e
     (log-warn-error e "Failed to cleanup worker " id ". Will retry later"))
   (catch RuntimeException e
@@ -309,9 +310,7 @@
             (log-debug "Removing path " path)
             (.delete (File. path))
             (catch Exception e))))) ;; on windows, the supervisor may still 
holds the lock on the worker directory
-    (try-cleanup-worker conf id)
-    (if (conf STORM-CGROUP-ENABLE)
-      (.shutDownWorker (:cgroup-manager supervisor) id false)))
+    (try-cleanup-worker conf id))
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
 
 (def SUPERVISOR-ZK-ACLS
@@ -354,11 +353,11 @@
    :sync-retry (atom 0)
    :download-lock (Object.)
    :stormid->profiler-actions (atom {})
-   :cgroup-manager (if (conf STORM-CGROUP-ENABLE)
-                     (let [cgroup-manager (.newInstance (Class/forName (conf 
STORM-RESOURCE-ISOLATION-PLUGIN)))]
-                       (.prepare cgroup-manager conf)
+   :resource-isolation-manager (if (conf 
STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE)
+                     (let [resource-isolation-manager (Utils/newInstance (conf 
STORM-RESOURCE-ISOLATION-PLUGIN))]
+                       (.prepare resource-isolation-manager conf)
                        (log-message "Using resource isolation plugin " (conf 
STORM-RESOURCE-ISOLATION-PLUGIN))
-                       cgroup-manager)
+                       resource-isolation-manager)
                      nil)
    })
 
@@ -384,8 +383,7 @@
       (dofor [[port assignment] reassign-executors]
         (let [id (new-worker-ids port)
               storm-id (:storm-id assignment)
-              ^WorkerResources resources (:resources assignment)
-              mem-onheap (.get_mem_on_heap resources)]
+              ^WorkerResources resources (:resources assignment)]
           ;; This condition checks for required files exist before launching 
the worker
           (if (required-topo-files-exist? conf storm-id)
             (let [pids-path (ConfigUtils/workerPidsRoot conf id)
@@ -1088,12 +1086,10 @@
                         (Utils/addToClasspath [stormjar])
                         (Utils/addToClasspath topo-classpath))
           top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
-          mem-onheap (if (and (.get_mem_on_heap resources) (> 
(.get_mem_on_heap resources) 0)) ;; not nil and not zero
-                       (int (Math/ceil (.get_mem_on_heap resources))) ;; round 
up
-                       (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use 
default value
-          mem-offheap (if (.get_mem_off_heap resources)
-                        (int (Math/ceil (.get_mem_off_heap resources))) ;; 
round up
-                        0)
+
+          mem-onheap (int (Math/ceil (.get_mem_on_heap resources)))
+
+          mem-offheap (int (Math/ceil (.get_mem_off_heap resources)))
 
           cpu (int (Math/ceil (.get_cpu resources)))
 
@@ -1121,24 +1117,7 @@
                                           storm-log4j2-conf-dir)
                                      Utils/FILE_PATH_SEPARATOR "worker.xml")
 
-          cgroup-command (if (conf STORM-CGROUP-ENABLE)
-                           (str/split
-                             (.startNewWorker (:cgroup-manager supervisor) 
worker-id
-                               (merge
-                                 ;; The manually set CGROUP-WORKER-CPU-LIMIT 
config on supervisor will overwrite resources assigned by RAS (Resource Aware 
Scheduler)
-                                 (cond
-                                   (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT) 
{"memory" (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT)}
-                                   (+ mem-onheap mem-offheap) {"memory" (+ 
mem-onheap mem-offheap)}
-                                   :else nil)
-                                 ;; The manually set CGROUP-WORKER-CPU-LIMIT 
config on supervisor will overwrite resources assigned by RAS (Resource Aware 
Scheduler)
-                                 (cond
-                                   (conf STORM-WORKER-CGROUP-CPU-LIMIT) {"cpu" 
(conf STORM-WORKER-CGROUP-CPU-LIMIT)}
-                                   (not= cpu nil) {"cpu" cpu}
-                                   :else nil))) #" "))
-
           command (concat
-                    (if (conf STORM-CGROUP-ENABLE)
-                      cgroup-command)
                     [(java-cmd) "-cp" classpath
                      topo-worker-logwriter-childopts
                      (str "-Dlogfile.name=" logfilename)
@@ -1177,7 +1156,14 @@
                      worker-id])
           command (->> command
                        (map str)
-                       (filter (complement empty?)))]
+                       (filter (complement empty?)))
+          command (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE)
+                    (do
+                      (.reserveResourcesForWorker (:resource-isolation-manager 
supervisor) worker-id
+                        {"cpu" cpu "memory" (+ mem-onheap mem-offheap)})
+                      (.getLaunchCommand (:resource-isolation-manager 
supervisor) worker-id
+                        (java.util.ArrayList. (java.util.Arrays/asList 
(to-array command)))))
+                    command)]
       (log-message "Launching worker with command: " (Utils/shellCmd command))
       (write-log-metadata! storm-conf user worker-id storm-id port conf)
       (ConfigUtils/setWorkerUserWSE conf worker-id user)

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index a5c1ea0..ebe435c 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -2196,6 +2196,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object CLIENT_JAR_TRANSFORMER = 
"client.jartransformer.class";
 
 
+    /**
+     * The plugin to be used for resource isolation
+     */
     @isImplementationOfClass(implementsClass = 
ResourceIsolationInterface.class)
     public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = 
"storm.resource.isolation.plugin";
 
@@ -2222,10 +2225,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CGROUP_HIERARCHY_NAME = 
"storm.cgroup.hierarchy.name";
 
     /**
-     * flag to determine whether to use cgroups
+     * flag to determine whether to use a resource isolation plugin
+     * Also determines whether the unit tests for cgroup runs.
+     * If storm.resource.isolation.plugin.enable is set to false the unit 
tests for cgroups will not run
      */
     @isBoolean
-    public static final String STORM_CGROUP_ENABLE = "storm.cgroup.enable";
+    public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = 
"storm.resource.isolation.plugin.enable";
 
     /**
      * root directory for cgoups

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java 
b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
index 8e52bc7..2db9f1b 100644
--- 
a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
+++ 
b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.container;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -26,18 +27,25 @@ import java.util.Map;
 public interface ResourceIsolationInterface {
 
     /**
+     * This function should be used prior to starting the worker to reserve 
resources for the worker
      * @param workerId worker id of the worker to start
      * @param resources set of resources to limit
-     * @return a String that includes to command on how to start the worker.  
The string returned from this function
-     * will be concatenated to the front of the command to launch 
logwriter/worker in supervisor.clj
      */
-    public String startNewWorker(String workerId, Map resources);
+    void reserveResourcesForWorker(String workerId, Map resources);
 
     /**
      * This function will be called when the worker needs to shutdown.  This 
function should include logic to clean up after a worker is shutdown
      * @param workerId worker id to shutdown and clean up after
-     * @param isKilled whether to actually kill worker
      */
-    public void shutDownWorker(String workerId, boolean isKilled);
+    void releaseResourcesForWorker(String workerId);
+
+
+    /**
+     * After reserving resources for the worker (i.e. calling 
reserveResourcesForWorker). This function can be used
+     * to get the modified command line to launch the worker with resource 
isolation
+     * @param existingCommand
+     * @return new commandline with necessary additions to launch worker with 
resource isolation
+     */
+    List<String> getLaunchCommand(String workerId, List<String> 
existingCommand);
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
index f7e7f69..449eaa9 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.container.cgroup;
 
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +26,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,24 +43,18 @@ public class CgroupCenter implements CgroupOperation {
 
     }
 
-    /**
-     * Thread unsafe
-     * 
-     * @return
-     */
     public synchronized static CgroupCenter getInstance() {
-        if (instance == null) {
+        if (CgroupUtils.enabled()) {
             instance = new CgroupCenter();
+            return instance;
         }
-        return CgroupUtils.enabled() ? instance : null;
+        return null;
     }
 
     @Override
     public List<Hierarchy> getHierarchies() {
-
         Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
-
-        try (FileReader reader = new FileReader(Constants.MOUNT_STATUS_FILE);
+        try (FileReader reader = new FileReader(CgroupUtils.MOUNT_STATUS_FILE);
              BufferedReader br = new BufferedReader(reader)) {
             String str = null;
             while ((str = br.readLine()) != null) {
@@ -69,8 +65,8 @@ public class CgroupCenter implements CgroupOperation {
                 String name = strSplit[0];
                 String type = strSplit[3];
                 String dir = strSplit[1];
-                Hierarchy h = hierarchies.get(type);
-                h = new Hierarchy(name, CgroupUtils.analyse(type), dir);
+                //Some mount options (i.e. rw and relatime) in type are not 
cgroups related
+                Hierarchy h = new Hierarchy(name, 
CgroupUtils.getSubSystemsFromString(type), dir);
                 hierarchies.put(type, h);
             }
             return new ArrayList<Hierarchy>(hierarchies.values());
@@ -82,10 +78,8 @@ public class CgroupCenter implements CgroupOperation {
 
     @Override
     public Set<SubSystem> getSubSystems() {
-
         Set<SubSystem> subSystems = new HashSet<SubSystem>();
-
-        try (FileReader reader = new FileReader(Constants.CGROUP_STATUS_FILE);
+        try (FileReader reader = new 
FileReader(CgroupUtils.CGROUP_STATUS_FILE);
              BufferedReader br = new BufferedReader(reader)){
             String str = null;
             while ((str = br.readLine()) != null) {
@@ -94,8 +88,10 @@ public class CgroupCenter implements CgroupOperation {
                 if (type == null) {
                     continue;
                 }
-                subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), 
Integer.valueOf(split[2])
-                        , Integer.valueOf(split[3]).intValue() == 1 ? true : 
false));
+                int hierarchyID = Integer.valueOf(split[1]);
+                int cgroupNum = Integer.valueOf(split[2]);
+                boolean enable =  Integer.valueOf(split[3]).intValue() == 1 ? 
true : false;
+                subSystems.add(new SubSystem(type, hierarchyID, cgroupNum, 
enable));
             }
             return subSystems;
         } catch (Exception e) {
@@ -105,11 +101,10 @@ public class CgroupCenter implements CgroupOperation {
     }
 
     @Override
-    public boolean enabled(SubSystemType subsystem) {
-
+    public boolean isSubSystemEnabled(SubSystemType subSystemType) {
         Set<SubSystem> subSystems = this.getSubSystems();
         for (SubSystem subSystem : subSystems) {
-            if (subSystem.getType() == subsystem) {
+            if (subSystem.getType() == subSystemType) {
                 return true;
             }
         }
@@ -117,25 +112,17 @@ public class CgroupCenter implements CgroupOperation {
     }
 
     @Override
-    public Hierarchy busy(SubSystemType subsystem) {
-        List<Hierarchy> hierarchies = this.getHierarchies();
-        for (Hierarchy hierarchy : hierarchies) {
-            for (SubSystemType type : hierarchy.getSubSystems()) {
-                if (type == subsystem) {
-                    return hierarchy;
-                }
-            }
-        }
-        return null;
+    public Hierarchy getHierarchyWithSubSystem(SubSystemType subSystem) {
+        return getHierarchyWithSubSystems(Arrays.asList(subSystem));
     }
 
     @Override
-    public Hierarchy busy(List<SubSystemType> subSystems) {
+    public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> 
subSystems) {
         List<Hierarchy> hierarchies = this.getHierarchies();
         for (Hierarchy hierarchy : hierarchies) {
             Hierarchy ret = hierarchy;
-            for (SubSystemType subsystem : subSystems) {
-                if (!hierarchy.getSubSystems().contains(subsystem)) {
+            for (SubSystemType subSystem : subSystems) {
+                if (!hierarchy.getSubSystems().contains(subSystem)) {
                     ret = null;
                     break;
                 }
@@ -148,85 +135,82 @@ public class CgroupCenter implements CgroupOperation {
     }
 
     @Override
-    public Hierarchy mounted(Hierarchy hierarchy) {
-
-        List<Hierarchy> hierarchies = this.getHierarchies();
-        if (CgroupUtils.dirExists(hierarchy.getDir())) {
+    public boolean isMounted(Hierarchy hierarchy) {
+        if (Utils.CheckDirExists(hierarchy.getDir())) {
+            List<Hierarchy> hierarchies = this.getHierarchies();
             for (Hierarchy h : hierarchies) {
                 if (h.equals(hierarchy)) {
-                    return h;
+                    return true;
                 }
             }
         }
-        return null;
+        return false;
     }
 
     @Override
     public void mount(Hierarchy hierarchy) throws IOException {
-
-        if (this.mounted(hierarchy) != null) {
-            LOG.error("{} is mounted", hierarchy.getDir());
+        if (this.isMounted(hierarchy)) {
+            LOG.error("{} is already mounted", hierarchy.getDir());
             return;
         }
-        Set<SubSystemType> subsystems = hierarchy.getSubSystems();
-        for (SubSystemType type : subsystems) {
-            if (this.busy(type) != null) {
-                LOG.error("subsystem: {} is busy", type.name());
-                subsystems.remove(type);
+        Set<SubSystemType> subSystems = hierarchy.getSubSystems();
+        for (SubSystemType type : subSystems) {
+            Hierarchy hierarchyWithSubSystem = 
this.getHierarchyWithSubSystem(type);
+            if (hierarchyWithSubSystem != null) {
+                LOG.error("subSystem: {} is already mounted on hierarchy: {}", 
type.name(), hierarchyWithSubSystem);
+                subSystems.remove(type);
             }
         }
-        if (subsystems.size() == 0) {
+        if (subSystems.size() == 0) {
             return;
         }
-        if (!CgroupUtils.dirExists(hierarchy.getDir())) {
+        if (!Utils.CheckDirExists(hierarchy.getDir())) {
             new File(hierarchy.getDir()).mkdirs();
         }
-        String subSystems = CgroupUtils.reAnalyse(subsystems);
-        SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", 
subSystems);
+        String subSystemsName = CgroupUtils.subSystemsToString(subSystems);
+        SystemOperation.mount(subSystemsName, hierarchy.getDir(), "cgroup", 
subSystemsName);
 
     }
 
     @Override
     public void umount(Hierarchy hierarchy) throws IOException {
-        if (this.mounted(hierarchy) != null) {
+        if (this.isMounted(hierarchy)) {
             hierarchy.getRootCgroups().delete();
             SystemOperation.umount(hierarchy.getDir());
             CgroupUtils.deleteDir(hierarchy.getDir());
+        } else {
+            LOG.error("{} is not mounted", hierarchy.getDir());
         }
     }
 
     @Override
-    public void create(CgroupCommon cgroup) throws SecurityException {
+    public void createCgroup(CgroupCommon cgroup) throws SecurityException {
         if (cgroup.isRoot()) {
             LOG.error("You can't create rootCgroup in this function");
-            return;
+            throw new RuntimeException("You can't create rootCgroup in this 
function");
         }
         CgroupCommon parent = cgroup.getParent();
         while (parent != null) {
-            if (!CgroupUtils.dirExists(parent.getDir())) {
-                LOG.error(" {} is not existed", parent.getDir());
-                return;
+            if (!Utils.CheckDirExists(parent.getDir())) {
+                throw new RuntimeException("Parent " + parent.getDir() + "does 
not exist");
             }
             parent = parent.getParent();
         }
         Hierarchy h = cgroup.getHierarchy();
-        if (mounted(h) == null) {
-            LOG.error("{} is not mounted", h.getDir());
-            return;
+        if (!isMounted(h)) {
+            throw new RuntimeException("hierarchy " + h.getDir() + " is not 
mounted");
         }
-        if (CgroupUtils.dirExists(cgroup.getDir())) {
-            LOG.error("{} is existed", cgroup.getDir());
-            return;
+        if (Utils.CheckDirExists(cgroup.getDir())) {
+            throw new RuntimeException("cgroup {} already exists " + 
cgroup.getDir());
         }
 
-        //Todo perhaps thrown exception or print out error message is dir is 
not created successfully
         if (!(new File(cgroup.getDir())).mkdir()) {
-            LOG.error("Could not create cgroup dir at {}", cgroup.getDir());
+            throw new RuntimeException("Could not create cgroup dir at " + 
cgroup.getDir());
         }
     }
 
     @Override
-    public void delete(CgroupCommon cgroup) throws IOException {
+    public void deleteCgroup(CgroupCommon cgroup) throws IOException {
         cgroup.delete();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
index fbf96ba..b12fcc0 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -45,12 +45,8 @@ public class CgroupCommon implements CgroupCommonOperation {
 
     private final CgroupCommon parent;
 
-    private final Map<SubSystemType, CgroupCore> cores;
-
     private final boolean isRoot;
 
-    private final Set<CgroupCommon> children = new HashSet<CgroupCommon>();
-
     private static final Logger LOG = 
LoggerFactory.getLogger(CgroupCommon.class);
 
     public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) 
{
@@ -58,8 +54,6 @@ public class CgroupCommon implements CgroupCommonOperation {
         this.hierarchy = hierarchy;
         this.parent = parent;
         this.dir = parent.getDir() + "/" + name;
-        this.init();
-        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), 
this.dir);
         this.isRoot = false;
     }
 
@@ -71,19 +65,17 @@ public class CgroupCommon implements CgroupCommonOperation {
         this.hierarchy = hierarchy;
         this.parent = null;
         this.dir = dir;
-        this.init();
-        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), 
this.dir);
         this.isRoot = true;
     }
 
     @Override
     public void addTask(int taskId) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), 
String.valueOf(taskId));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, TASKS), 
String.valueOf(taskId));
     }
 
     @Override
     public Set<Integer> getTasks() throws IOException {
-        List<String> stringTasks = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+        List<String> stringTasks = 
CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, TASKS));
         Set<Integer> tasks = new HashSet<Integer>();
         for (String task : stringTasks) {
             tasks.add(Integer.valueOf(task));
@@ -93,12 +85,12 @@ public class CgroupCommon implements CgroupCommonOperation {
 
     @Override
     public void addProcs(int pid) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), 
String.valueOf(pid));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CGROUP_PROCS), String.valueOf(pid));
     }
 
     @Override
     public Set<Integer> getPids() throws IOException {
-        List<String> stringPids = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
+        List<String> stringPids = 
CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS));
         Set<Integer> pids = new HashSet<Integer>();
         for (String task : stringPids) {
             pids.add(Integer.valueOf(task));
@@ -109,41 +101,43 @@ public class CgroupCommon implements 
CgroupCommonOperation {
     @Override
     public void setNotifyOnRelease(boolean flag) throws IOException {
 
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
NOTIFY_ON_RELEASE), flag ? "1" : "0");
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
NOTIFY_ON_RELEASE), flag ? "1" : "0");
     }
 
     @Override
     public boolean getNotifyOnRelease() throws IOException {
-        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
     }
 
     @Override
     public void setReleaseAgent(String command) throws IOException {
         if (!this.isRoot) {
+            LOG.warn("Cannot set {} in {} since its not the root group", 
RELEASE_AGENT, this.isRoot);
             return;
         }
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), 
command);
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
RELEASE_AGENT), command);
     }
 
     @Override
     public String getReleaseAgent() throws IOException {
         if (!this.isRoot) {
+            LOG.warn("Cannot get {} in {} since its not the root group", 
RELEASE_AGENT, this.isRoot);
             return null;
         }
-        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
RELEASE_AGENT)).get(0);
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
RELEASE_AGENT)).get(0);
     }
 
     @Override
     public void setCgroupCloneChildren(boolean flag) throws IOException {
-        if (!this.cores.keySet().contains(SubSystemType.cpuset)) {
+        if (!getCores().keySet().contains(SubSystemType.cpuset)) {
             return;
         }
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
     }
 
     @Override
     public boolean getCgroupCloneChildren() throws IOException {
-        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
     }
 
     @Override
@@ -156,7 +150,7 @@ public class CgroupCommon implements CgroupCommonOperation {
             sb.append(' ');
             sb.append(arg);
         }
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_EVENT_CONTROL), sb.toString());
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CGROUP_EVENT_CONTROL), sb.toString());
     }
 
     public Hierarchy getHierarchy() {
@@ -176,6 +170,19 @@ public class CgroupCommon implements CgroupCommonOperation 
{
     }
 
     public Set<CgroupCommon> getChildren() {
+
+        File file = new File(this.dir);
+        File[] files = file.listFiles();
+        if (files == null) {
+            LOG.info("{} is not a directory", this.dir);
+            return null;
+        }
+        Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+        for (File child : files) {
+            if (child.isDirectory()) {
+                children.add(new CgroupCommon(child.getName(), this.hierarchy, 
this));
+            }
+        }
         return children;
     }
 
@@ -184,7 +191,7 @@ public class CgroupCommon implements CgroupCommonOperation {
     }
 
     public Map<SubSystemType, CgroupCore> getCores() {
-        return cores;
+        return CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), 
this.dir);
     }
 
     public void delete() throws IOException {
@@ -195,7 +202,7 @@ public class CgroupCommon implements CgroupCommonOperation {
     }
 
     private void free() throws IOException {
-        for (CgroupCommon child : this.children) {
+        for (CgroupCommon child : getChildren()) {
             child.free();
         }
         if (this.isRoot) {
@@ -210,17 +217,54 @@ public class CgroupCommon implements 
CgroupCommonOperation {
         CgroupUtils.deleteDir(this.dir);
     }
 
-    private void init() {
-        File file = new File(this.dir);
-        File[] files = file.listFiles();
-        if (files == null) {
-            return;
-        }
-        for (File child : files) {
-            if (child.isDirectory()) {
-                this.children.add(new CgroupCommon(child.getName(), 
this.hierarchy, this));
+    @Override
+    public boolean equals(Object o) {
+        boolean ret = false;
+        if (o != null && (o instanceof CgroupCommon)) {
+
+            boolean hierarchyFlag =false;
+            if (((CgroupCommon)o).hierarchy != null && this.hierarchy != null) 
{
+                hierarchyFlag = 
((CgroupCommon)o).hierarchy.equals(this.hierarchy);
+            } else if (((CgroupCommon)o).hierarchy == null && this.hierarchy 
== null) {
+                hierarchyFlag = true;
+            } else {
+                hierarchyFlag = false;
+            }
+
+            boolean nameFlag = false;
+            if (((CgroupCommon)o).name != null && this.name != null) {
+                nameFlag = ((CgroupCommon)o).name.equals(this.name);
+            } else if (((CgroupCommon)o).name == null && this.name == null) {
+                nameFlag = true;
+            } else {
+                nameFlag = false;
             }
+
+            boolean dirFlag = false;
+            if (((CgroupCommon)o).dir != null && this.dir != null) {
+                dirFlag = ((CgroupCommon)o).dir.equals(this.dir);
+            } else if (((CgroupCommon)o).dir == null && this.dir == null) {
+                dirFlag = true;
+            } else {
+                dirFlag = false;
+            }
+            ret = hierarchyFlag && nameFlag && dirFlag;
         }
+        return ret;
     }
 
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (this.name != null ? this.name.hashCode() : 
0);
+        result = prime * result + (this.hierarchy != null ? 
this.hierarchy.hashCode() : 0);
+        result = prime * result + (this.dir != null ? this.dir.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return this.getName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
index f6b4ece..54368b6 100755
--- 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
+++ 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -78,5 +78,4 @@ public interface CgroupCommonOperation {
      * set event control config
      */
     public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
index 98aedcf..53a8a7f 100755
--- 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
+++ 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
@@ -71,5 +71,4 @@ public class CgroupCoreFactory {
         }
         return result;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
index a3dbd9d..8b775be 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.container.cgroup;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.storm.Config;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.container.cgroup.core.CpuCore;
@@ -28,6 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -35,6 +38,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * Class that implements ResourceIsolationInterface that manages cgroups
+ */
 public class CgroupManager implements ResourceIsolationInterface {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CgroupManager.class);
@@ -49,16 +55,20 @@ public class CgroupManager implements 
ResourceIsolationInterface {
 
     private Map conf;
 
+    /**
+     * initialize intial data structures
+     * @param conf storm confs
+     */
     public void prepare(Map conf) throws IOException {
         this.conf = conf;
         this.rootDir = Config.getCgroupRootDir(this.conf);
         if (this.rootDir == null) {
-            throw new RuntimeException("Check configuration file. The 
supervisor.cgroup.rootdir is missing.");
+            throw new RuntimeException("Check configuration file. The 
storm.supervisor.cgroup.rootdir is missing.");
         }
 
         File file = new File(Config.getCgroupStormHierarchyDir(conf) + "/" + 
this.rootDir);
         if (!file.exists()) {
-            LOG.error("{}/{} is not existing.", 
Config.getCgroupStormHierarchyDir(conf), this.rootDir);
+            LOG.error("{} is not existing.", file.getPath());
             throw new RuntimeException("Check if cgconfig service starts or 
/etc/cgconfig.conf is consistent with configuration file.");
         }
         this.center = CgroupCenter.getInstance();
@@ -69,6 +79,30 @@ public class CgroupManager implements 
ResourceIsolationInterface {
     }
 
     /**
+     * initalize subsystems
+     */
+    private void prepareSubSystem(Map conf) throws IOException {
+        List<SubSystemType> subSystemTypes = new LinkedList<>();
+        for (String resource : Config.getCgroupStormResources(conf)) {
+            subSystemTypes.add(SubSystemType.getSubSystem(resource));
+        }
+
+        this.hierarchy = center.getHierarchyWithSubSystems(subSystemTypes);
+
+        if (this.hierarchy == null) {
+            Set<SubSystemType> types = new HashSet<SubSystemType>();
+            types.add(SubSystemType.cpu);
+            this.hierarchy = new 
Hierarchy(Config.getCgroupStormHierarchyName(conf), types, 
Config.getCgroupStormHierarchyDir(conf));
+        }
+        this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, 
this.hierarchy.getRootCgroups());
+
+        // set upper limit to how much cpu can be used by all workers running 
on supervisor node.
+        // This is done so that some cpu cycles will remain free to run the 
daemons and other miscellaneous OS operations.
+        CpuCore supervisorRootCPU = (CpuCore) 
this.rootCgroup.getCores().get(SubSystemType.cpu);
+        setCpuUsageUpperLimit(supervisorRootCPU, ((Number) 
this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue());
+    }
+
+    /**
      * User cfs_period & cfs_quota to control the upper limit use of cpu core 
e.g.
      * If making a process to fully use two cpu cores, set cfs_period_us to
      * 100000 and set cfs_quota_us to 200000
@@ -84,22 +118,36 @@ public class CgroupManager implements 
ResourceIsolationInterface {
         }
     }
 
-    public String startNewWorker(String workerId, Map resourcesMap) throws 
SecurityException {
-        Number cpuNum = (Number) resourcesMap.get("cpu");
+    public void reserveResourcesForWorker(String workerId, Map resourcesMap) 
throws SecurityException {
+        Number cpuNum = null;
+        // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor 
will overwrite resources assigned by RAS (Resource Aware Scheduler)
+        if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) {
+            cpuNum = (Number) 
this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT);
+        } else if(resourcesMap.get("cpu") != null) {
+            cpuNum = (Number) resourcesMap.get("cpu");
+        }
+
         Number totalMem = null;
-        if (resourcesMap.get("memory") != null) {
+        // The manually set STORM_WORKER_CGROUP_MEMORY_MB_LIMIT config on 
supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler)
+        if (this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT) != null) 
{
+            totalMem = (Number) 
this.conf.get(Config.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT);
+        } else if (resourcesMap.get("memory") != null) {
             totalMem = (Number) resourcesMap.get("memory");
         }
 
-        CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, 
this.rootCgroup);
-        this.center.create(workerGroup);
+        CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, 
this.rootCgroup);
+        try {
+            this.center.createCgroup(workerGroup);
+        } catch (Exception e) {
+            LOG.error("Error when creating Cgroup: {}", e);
+        }
 
         if (cpuNum != null) {
             CpuCore cpuCore = (CpuCore) 
workerGroup.getCores().get(SubSystemType.cpu);
             try {
                 cpuCore.setCpuShares(cpuNum.intValue());
             } catch (IOException e) {
-                throw new RuntimeException("Cannot set cpu.shares! Exception: 
" + e);
+                throw new RuntimeException("Cannot set cpu.shares! Exception: 
", e);
             }
         }
 
@@ -108,70 +156,55 @@ public class CgroupManager implements 
ResourceIsolationInterface {
             try {
                 
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
             } catch (IOException e) {
-                throw new RuntimeException("Cannot set memory.limit_in_bytes! 
Exception: " + e);
-            }
-        }
-
-        StringBuilder sb = new StringBuilder();
-
-        sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g 
");
-
-        Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator();
-        while(it.hasNext()) {
-            sb.append(it.next().toString());
-            if(it.hasNext()) {
-                sb.append(",");
-            } else {
-                sb.append(":");
+                throw new RuntimeException("Cannot set memory.limit_in_bytes! 
Exception: ", e);
             }
         }
-
-        sb.append(workerGroup.getName());
-
-        return sb.toString();
     }
 
-    public void shutDownWorker(String workerId, boolean isKilled) {
+    public void releaseResourcesForWorker(String workerId) {
         CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, 
this.rootCgroup);
         try {
-            if (isKilled == false) {
-                for (Integer pid : workerGroup.getTasks()) {
-                    Utils.kill(pid);
-                }
-                Utils.sleepMs(1500);
-            }
             Set<Integer> tasks = workerGroup.getTasks();
-            if (isKilled == true && !tasks.isEmpty()) {
+            if (!tasks.isEmpty()) {
                 throw new Exception("Cannot correctly showdown worker CGroup " 
+ workerId + "tasks " + tasks.toString() + " still running!");
             }
-            this.center.delete(workerGroup);
+            this.center.deleteCgroup(workerGroup);
         } catch (Exception e) {
             LOG.error("Exception thrown when shutting worker {} Exception: 
{}", workerId, e);
         }
     }
 
-    public void close() throws IOException {
-        this.center.delete(this.rootCgroup);
-    }
+    @Override
+    public List<String> getLaunchCommand(String workerId, List<String> 
existingCommand) {
 
-    private void prepareSubSystem(Map conf) throws IOException {
-        List<SubSystemType> subSystemTypes = new LinkedList<>();
-        for (String resource : Config.getCgroupStormResources(conf)) {
-            subSystemTypes.add(SubSystemType.getSubSystem(resource));
+        CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, 
this.rootCgroup);
+
+        if(!this.rootCgroup.getChildren().contains(workerGroup)) {
+            LOG.error("cgroup {} doesn't exist! Need to reserve resources for 
worker first!", workerGroup);
+            return existingCommand;
         }
 
-        this.hierarchy = center.busy(subSystemTypes);
+        StringBuilder sb = new StringBuilder();
 
-        if (this.hierarchy == null) {
-            Set<SubSystemType> types = new HashSet<SubSystemType>();
-            types.add(SubSystemType.cpu);
-            this.hierarchy = new 
Hierarchy(Config.getCgroupStormHierarchyName(conf), types, 
Config.getCgroupStormHierarchyDir(conf));
+        sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g 
");
+
+        Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator();
+        while(it.hasNext()) {
+            sb.append(it.next().toString());
+            if(it.hasNext()) {
+                sb.append(",");
+            } else {
+                sb.append(":");
+            }
         }
-        this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, 
this.hierarchy.getRootCgroups());
+        sb.append(workerGroup.getName());
+        List<String> newCommand = new ArrayList<String>();
+        newCommand.addAll(Arrays.asList(sb.toString().split(" ")));
+        newCommand.addAll(existingCommand);
+        return newCommand;
+    }
 
-        // set upper limit to how much cpu can be used by all workers running 
on supervisor node.
-        // This is done so that some cpu cycles will remain free to run the 
daemons and other miscellaneous OS operations.
-        CpuCore supervisorRootCPU = (CpuCore)  
this.rootCgroup.getCores().get(SubSystemType.cpu);
-        setCpuUsageUpperLimit(supervisorRootCPU, ((Number) 
this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue());
+    public void close() throws IOException {
+        this.center.deleteCgroup(this.rootCgroup);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
index aa315ba..3626d04 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
@@ -21,26 +21,58 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * An interface to manage cgroups
+ */
 public interface CgroupOperation {
 
+    /**
+     * Get a list of hierarchies
+     */
     public List<Hierarchy> getHierarchies();
 
+    /**
+     * get a list of available subsystems
+     */
     public Set<SubSystem> getSubSystems();
 
-    public boolean enabled(SubSystemType subsystem);
+    /**
+     * Check if a subsystem is enabled
+     */
+    public boolean isSubSystemEnabled(SubSystemType subsystem);
 
-    public Hierarchy busy(SubSystemType subsystem);
+    /**
+     * get the first hierarchy that has a certain subsystem isMounted
+     */
+    public Hierarchy getHierarchyWithSubSystem(SubSystemType subsystem);
 
-    public Hierarchy busy(List<SubSystemType> subSystems);
+    /**
+     * get the first hierarchy that has a certain list of subsystems isMounted
+     */
+    public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> 
subSystems);
 
-    public Hierarchy mounted(Hierarchy hierarchy);
+    /**
+     * check if a hiearchy is mounted
+     */
+    public boolean isMounted(Hierarchy hierarchy);
 
+    /**
+     * mount a hierarchy
+     */
     public void mount(Hierarchy hierarchy) throws IOException;
 
+    /**
+     * umount a heirarchy
+     */
     public void umount(Hierarchy hierarchy) throws IOException;
 
-    public void create(CgroupCommon cgroup) throws SecurityException;
-
-    public void delete(CgroupCommon cgroup) throws IOException;
+    /**
+     * create a cgroup
+     */
+    public void createCgroup(CgroupCommon cgroup) throws SecurityException;
 
+    /**
+     * delete a cgroup
+     */
+    public void deleteCgroup(CgroupCommon cgroup) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
index 7c88f5d..c41b491 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
@@ -17,22 +17,26 @@
  */
 package org.apache.storm.container.cgroup;
 
+import com.google.common.io.Files;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 public class CgroupUtils {
 
+    public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+    public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(CgroupUtils.class);
 
     public static void deleteDir(String dir) {
@@ -50,20 +54,14 @@ public class CgroupUtils {
         }
     }
 
-    public static boolean fileExists(String dir) {
-        File file = new File(dir);
-        return file.exists();
-    }
-
-    public static boolean dirExists(String dir) {
-        File file = new File(dir);
-        return file.isDirectory();
-    }
-
-    public static Set<SubSystemType> analyse(String str) {
+    /**
+     * Get a set of SubSystemType objects from a comma delimited list of 
subsystem names
+     */
+    public static Set<SubSystemType> getSubSystemsFromString(String str) {
         Set<SubSystemType> result = new HashSet<SubSystemType>();
         String[] subSystems = str.split(",");
         for (String subSystem : subSystems) {
+            //return null to mount options in string that is not part of 
cgroups
             SubSystemType type = SubSystemType.getSubSystem(subSystem);
             if (type != null) {
                 result.add(type);
@@ -72,7 +70,10 @@ public class CgroupUtils {
         return result;
     }
 
-    public static String reAnalyse(Set<SubSystemType> subSystems) {
+    /**
+     * Get a string that is a comma delimited list of subsystems
+     */
+    public static String subSystemsToString(Set<SubSystemType> subSystems) {
         StringBuilder sb = new StringBuilder();
         if (subSystems.size() == 0) {
             return sb.toString();
@@ -84,31 +85,23 @@ public class CgroupUtils {
     }
 
     public static boolean enabled() {
-        return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
+        return Utils.checkFileExists(CGROUP_STATUS_FILE);
     }
 
-    public static List<String> readFileByLine(String fileDir) throws 
IOException {
-        List<String> result = new ArrayList<String>();
-        File file = new File(fileDir);
-        try (FileReader fileReader = new FileReader(file);
-             BufferedReader reader = new BufferedReader(fileReader)) {
-            String tempString = null;
-            while ((tempString = reader.readLine()) != null) {
-                result.add(tempString);
-            }
-        }
-        return result;
+    public static List<String> readFileByLine(String filePath) throws 
IOException {
+        return Files.readLines(new File(filePath), Charset.defaultCharset());
     }
 
-    public static void writeFileByLine(String fileDir, List<String> strings) 
throws IOException {
-        File file = new File(fileDir);
+    public static void writeFileByLine(String filePath, List<String> 
linesToWrite) throws IOException {
+        LOG.debug("For CGroups - writing {} to {} ", linesToWrite, filePath);
+        File file = new File(filePath);
         if (!file.exists()) {
-            LOG.error("{} is no existed", fileDir);
+            LOG.error("{} does not exist", filePath);
             return;
         }
         try (FileWriter writer = new FileWriter(file, true);
              BufferedWriter bw = new BufferedWriter(writer)) {
-            for (String string : strings) {
+            for (String string : linesToWrite) {
                 bw.write(string);
                 bw.newLine();
                 bw.flush();
@@ -116,18 +109,11 @@ public class CgroupUtils {
         }
     }
 
-    public static void writeFileByLine(String fileDir, String string) throws 
IOException {
-        LOG.debug("For CGroups - writing {} to {} ", string, fileDir);
-        File file = new File(fileDir);
-        if (!file.exists()) {
-            LOG.error("{} is no existed", fileDir);
-            return;
-        }
-        try (FileWriter writer = new FileWriter(file, true);
-             BufferedWriter bw = new BufferedWriter(writer)) {
-            bw.write(string);
-            bw.newLine();
-            bw.flush();
-        }
+    public static void writeFileByLine(String filePath, String lineToWrite) 
throws IOException {
+        writeFileByLine(filePath, Arrays.asList(lineToWrite));
+    }
+
+    public static String getDir(String dir, String constant) {
+        return dir + constant;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
deleted file mode 100755
index 0ce9643..0000000
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.container.cgroup;
-
-public class Constants {
-
-    public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
-
-    public static final String MOUNT_STATUS_FILE = "/proc/mounts";
-
-    public static String getDir(String dir, String constant) {
-        return dir + constant;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
index 26def4c..57eb8ff 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.container.cgroup;
 
+/**
+ * a class that represents a device in linux
+ */
 public class Device {
 
     public final int major;

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
index 16df384..440531a 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
@@ -19,6 +19,9 @@ package org.apache.storm.container.cgroup;
 
 import java.util.Set;
 
+/**
+ * A class that describes a cgroup hiearchy
+ */
 public class Hierarchy {
 
     private final String name;
@@ -36,13 +39,19 @@ public class Hierarchy {
         this.subSystems = subSystems;
         this.dir = dir;
         this.rootCgroups = new CgroupCommon(this, dir);
-        this.type = CgroupUtils.reAnalyse(subSystems);
+        this.type = CgroupUtils.subSystemsToString(subSystems);
     }
 
+    /**
+     * get subsystems
+     */
     public Set<SubSystemType> getSubSystems() {
         return subSystems;
     }
 
+    /**
+     * get all subsystems in hierarchy as a comma delimited list
+     */
     public String getType() {
         return type;
     }
@@ -105,7 +114,7 @@ public class Hierarchy {
         return name;
     }
 
-    public boolean subSystemMounted(SubSystemType subsystem) {
+    public boolean isSubSystemMounted(SubSystemType subsystem) {
         for (SubSystemType type : this.subSystems) {
             if (type == subsystem) {
                 return true;
@@ -114,4 +123,8 @@ public class Hierarchy {
         return false;
     }
 
+    @Override
+    public String toString() {
+        return this.dir;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
index ac62e61..e354fb0 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.container.cgroup;
 
+/**
+ * a class that implements operations that can be performed on a cgroup 
subsystem
+ */
 public class SubSystem {
 
     private SubSystemType type;
@@ -70,9 +73,9 @@ public class SubSystem {
     public boolean equals(Object object) {
         boolean ret = false;
         if (object != null && object instanceof SubSystem) {
-            ret = (this.type.equals(((SubSystem)object).getType()) && 
this.hierarchyID == ((SubSystem)object).getHierarchyID());
+            ret = ((this.type == ((SubSystem)object).getType())
+                    && (this.hierarchyID == 
((SubSystem)object).getHierarchyID()));
         }
         return ret;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
index 3c6c020..914abcc 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
@@ -17,42 +17,20 @@
  */
 package org.apache.storm.container.cgroup;
 
+/**
+ * A enum class to described the subsystems that can be used
+ */
 public enum SubSystemType {
 
-    // net_cls,ns is not supposted in ubuntu
+    // net_cls,ns is not supported in ubuntu
     blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, 
net_cls, net_prio;
 
+
     public static SubSystemType getSubSystem(String str) {
-        if (str.equals("blkio")) {
-            return blkio;
-        }
-        else if (str.equals("cpu")) {
-            return cpu;
-        }
-        else if (str.equals("cpuacct")) {
-            return cpuacct;
-        }
-        else if (str.equals("cpuset")) {
-            return cpuset;
-        }
-        else if (str.equals("devices")) {
-            return devices;
-        }
-        else if (str.equals("freezer")) {
-            return freezer;
-        }
-        else if (str.equals("memory")) {
-            return memory;
-        }
-        else if (str.equals("perf_event")) {
-            return perf_event;
-        }
-        else if (str.equals("net_cls")) {
-            return net_cls;
-        }
-        else if (str.equals("net_prio")) {
-            return net_prio;
+        try {
+            return SubSystemType.valueOf(str);
+        } catch (Exception e) {
+            return null;
         }
-        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
index ee3517a..6872b4a 100644
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
@@ -24,6 +24,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+/**
+ * A class that implements system operations for using cgroups
+ */
 public class SystemOperation {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SystemOperation.class);
@@ -31,17 +34,24 @@ public class SystemOperation {
     public static boolean isRoot() throws IOException {
         String result = SystemOperation.exec("echo $EUID").substring(0, 1);
         return Integer.valueOf(result.substring(0, 
result.length())).intValue() == 0 ? true : false;
-    };
+    }
 
-    public static void mount(String name, String target, String type, String 
data) throws IOException {
+    public static void mount(String name, String target, String type, String 
options) throws IOException {
         StringBuilder sb = new StringBuilder();
-        sb.append("mount -t ").append(type).append(" -o 
").append(data).append(" ").append(name).append(" ").append(target);
+        sb.append("mount -t ")
+                .append(type)
+                .append(" -o ")
+                .append(options)
+                .append(" ")
+                .append(name)
+                .append(" ")
+                .append(target);
         SystemOperation.exec(sb.toString());
     }
 
-    public static void umount(String name) throws IOException {
+    public static void umount(String pathToDir) throws IOException {
         StringBuilder sb = new StringBuilder();
-        sb.append("umount ").append(name);
+        sb.append("umount ").append(pathToDir);
         SystemOperation.exec(sb.toString());
     }
 
@@ -59,7 +69,7 @@ public class SystemOperation {
             }
             return output;
         } catch (InterruptedException ie) {
-            throw new IOException(ie.toString());
+            throw new IOException(ie);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
index 5522601..c426610 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
@@ -18,7 +18,6 @@
 package org.apache.storm.container.cgroup.core;
 
 import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
 import org.apache.storm.container.cgroup.SubSystemType;
 import org.apache.storm.container.cgroup.Device;
 
@@ -63,19 +62,19 @@ public class BlkioCore implements CgroupCore {
 
     /* weight: 100-1000 */
     public void setBlkioWeight(int weight) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), 
String.valueOf(weight));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_WEIGHT), String.valueOf(weight));
     }
 
     public int getBlkioWeight() throws IOException {
-        return 
Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_WEIGHT)).get(0)).intValue();
+        return 
Integer.valueOf(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_WEIGHT)).get(0)).intValue();
     }
 
     public void setBlkioWeightDevice(Device device, int weight) throws 
IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
     }
 
     public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
-        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
+        List<String> strings = 
CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
         Map<Device, Integer> result = new HashMap<Device, Integer>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -87,123 +86,79 @@ public class BlkioCore implements CgroupCore {
     }
 
     public void setReadBps(Device device, long bps) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
     }
 
     public Map<Device, Long> getReadBps() throws IOException {
-        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_BPS_DEVICE));
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        for (String string : strings) {
-            String[] strArgs = string.split(" ");
-            Device device = new Device(strArgs[0]);
-            Long bps = Long.valueOf(strArgs[1]);
-            result.put(device, bps);
-        }
-        return result;
+        return parseConfig(BLKIO_THROTTLE_READ_BPS_DEVICE);
     }
 
     public void setWriteBps(Device device, long bps) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
     }
 
     public Map<Device, Long> getWriteBps() throws IOException {
-        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_BPS_DEVICE));
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        for (String string : strings) {
-            String[] strArgs = string.split(" ");
-            Device device = new Device(strArgs[0]);
-            Long bps = Long.valueOf(strArgs[1]);
-            result.put(device, bps);
-        }
-        return result;
+        return parseConfig(BLKIO_THROTTLE_WRITE_BPS_DEVICE);
     }
 
     public void setReadIOps(Device device, long iops) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
     }
 
     public Map<Device, Long> getReadIOps() throws IOException {
-        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_IOPS_DEVICE));
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        for (String string : strings) {
-            String[] strArgs = string.split(" ");
-            Device device = new Device(strArgs[0]);
-            Long iops = Long.valueOf(strArgs[1]);
-            result.put(device, iops);
-        }
-        return result;
+        return parseConfig(BLKIO_THROTTLE_READ_IOPS_DEVICE);
     }
 
     public void setWriteIOps(Device device, long iops) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
     }
 
     public Map<Device, Long> getWriteIOps() throws IOException {
-        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        for (String string : strings) {
-            String[] strArgs = string.split(" ");
-            Device device = new Device(strArgs[0]);
-            Long iops = Long.valueOf(strArgs[1]);
-            result.put(device, iops);
-        }
-        return result;
+        return parseConfig(BLKIO_THROTTLE_WRITE_IOPS_DEVICE);
     }
 
     public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws 
IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICED)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICED)));
     }
 
     public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() 
throws IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICE_BYTES)));
     }
 
     public Map<Device, Long> getBlkioTime() throws IOException {
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME));
-        for (String str : strs) {
-            String[] strArgs = str.split(" ");
-            result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
-        }
-        return result;
+        return parseConfig(BLKIO_TIME);
     }
 
     public Map<Device, Long> getBlkioSectors() throws IOException {
-        Map<Device, Long> result = new HashMap<Device, Long>();
-        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS));
-        for (String str : strs) {
-            String[] strArgs = str.split(" ");
-            result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
-        }
-        return result;
+        return parseConfig(BLKIO_SECTORS);
     }
 
     public Map<Device, Map<RecordType, Long>> getIOServiced() throws 
IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICED)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_SERVICED)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws 
IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICE_BYTES)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_SERVICE_BYTES)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws 
IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICE_TIME)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_SERVICE_TIME)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws 
IOException {
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_WAIT_TIME)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_WAIT_TIME)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException 
{
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_MERGED)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_MERGED)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException 
{
-        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_QUEUED)));
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_IO_QUEUED)));
     }
 
     public void resetStats() throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_RESET_STATS), "1");
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
BLKIO_RESET_STATS), "1");
     }
 
     private String makeContext(Device device, Object data) {
@@ -212,6 +167,18 @@ public class BlkioCore implements CgroupCore {
         return sb.toString();
     }
 
+    private Map<Device, Long> parseConfig(String config) throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, config));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long value = Long.valueOf(strArgs[1]);
+            result.put(device, value);
+        }
+        return result;
+    }
+
     private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> 
strs) {
         Map<Device, Map<RecordType, Long>> result = new HashMap<Device, 
Map<RecordType, Long>>();
         for (String str : strs) {
@@ -236,22 +203,9 @@ public class BlkioCore implements CgroupCore {
         read, write, sync, async, total;
 
         public static RecordType getType(String type) {
-            if (type.equals("Read")) {
-                return read;
-            }
-            else if (type.equals("Write")) {
-                return write;
-            }
-            else if (type.equals("Sync")) {
-                return sync;
-            }
-            else if (type.equals("Async")) {
-                return async;
-            }
-            else if (type.equals("Total")) {
-                return total;
-            }
-            else {
+            try {
+                return RecordType.valueOf(type.toLowerCase());
+            } catch (Exception e) {
                 return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
index 054ec0d..1d21251 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
@@ -18,7 +18,6 @@
 package org.apache.storm.container.cgroup.core;
 
 import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
 import org.apache.storm.container.cgroup.SubSystemType;
 
 import java.io.IOException;
@@ -45,47 +44,47 @@ public class CpuCore implements CgroupCore {
     }
 
     public void setCpuShares(int weight) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), 
String.valueOf(weight));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES), 
String.valueOf(weight));
     }
 
     public int getCpuShares() throws IOException {
-        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_SHARES)).get(0));
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPU_SHARES)).get(0));
     }
 
     public void setCpuRtRuntimeUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_RT_RUNTIME_US), String.valueOf(us));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CPU_RT_RUNTIME_US), String.valueOf(us));
     }
 
     public long getCpuRtRuntimeUs() throws IOException {
-        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_RT_RUNTIME_US)).get(0));
+        return 
Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPU_RT_RUNTIME_US)).get(0));
     }
 
     public void setCpuRtPeriodUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_RT_PERIOD_US), String.valueOf(us));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CPU_RT_PERIOD_US), String.valueOf(us));
     }
 
     public Long getCpuRtPeriodUs() throws IOException {
-        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_RT_PERIOD_US)).get(0));
+        return 
Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPU_RT_PERIOD_US)).get(0));
     }
 
     public void setCpuCfsPeriodUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_CFS_PERIOD_US), String.valueOf(us));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CPU_CFS_PERIOD_US), String.valueOf(us));
     }
 
     public Long getCpuCfsPeriodUs() throws IOException {
-        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_CFS_PERIOD_US)).get(0));
+        return 
Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPU_CFS_PERIOD_US)).get(0));
     }
 
     public void setCpuCfsQuotaUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_CFS_QUOTA_US), String.valueOf(us));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, 
CPU_CFS_QUOTA_US), String.valueOf(us));
     }
 
     public Long getCpuCfsQuotaUs() throws IOException {
-        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_CFS_QUOTA_US)).get(0));
+        return 
Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPU_CFS_QUOTA_US)).get(0));
     }
 
     public Stat getCpuStat() throws IOException {
-        return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_STAT)));
+        return new 
Stat(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_STAT)));
     }
 
     public static class Stat {

http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
index 56ae2dc..2e683f4 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
@@ -18,7 +18,6 @@
 package org.apache.storm.container.cgroup.core;
 
 import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
 import org.apache.storm.container.cgroup.SubSystemType;
 
 import java.io.IOException;
@@ -44,11 +43,11 @@ public class CpuacctCore implements CgroupCore {
     }
 
     public Long getCpuUsage() throws IOException {
-        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUACCT_USAGE)).get(0));
+        return 
Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPUACCT_USAGE)).get(0));
     }
 
     public Map<StatType, Long> getCpuStat() throws IOException {
-        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT));
+        List<String> strs = 
CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_STAT));
         Map<StatType, Long> result = new HashMap<StatType, Long>();
         result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
         result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
@@ -56,7 +55,7 @@ public class CpuacctCore implements CgroupCore {
     }
 
     public Long[] getPerCpuUsage() throws IOException {
-        String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUACCT_USAGE_PERCPU)).get(0);
+        String str = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, 
CPUACCT_USAGE_PERCPU)).get(0);
         String[] strArgs = str.split(" ");
         Long[] result = new Long[strArgs.length];
         for (int i = 0; i < result.length; i++) {
@@ -65,7 +64,7 @@ public class CpuacctCore implements CgroupCore {
         return result;
     }
 
-    public enum StatType {
+    public static enum StatType {
         user, system;
     }
 

Reply via email to