[STORM-1336] - Evalute/Port JStorm cgroup support

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc063ecc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc063ecc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc063ecc

Branch: refs/heads/master
Commit: fc063ecc3e1fb6a3d50b9cf27c756db0baf2e6c3
Parents: 9ddd29f
Author: Boyang Jerry Peng <[email protected]>
Authored: Fri Feb 12 10:38:34 2016 -0600
Committer: Boyang Jerry Peng <[email protected]>
Committed: Fri Feb 12 10:38:34 2016 -0600

----------------------------------------------------------------------
 conf/cgconfig.conf.example                      |  41 +++
 conf/defaults.yaml                              |  13 +
 .../clj/org/apache/storm/daemon/supervisor.clj  |  50 +++-
 storm-core/src/jvm/org/apache/storm/Config.java |  78 ++++++
 .../container/ResourceIsolationInterface.java   |  43 +++
 .../storm/container/cgroup/CgroupCenter.java    | 232 +++++++++++++++++
 .../storm/container/cgroup/CgroupCommon.java    | 226 ++++++++++++++++
 .../container/cgroup/CgroupCommonOperation.java |  82 ++++++
 .../container/cgroup/CgroupCoreFactory.java     |  75 ++++++
 .../storm/container/cgroup/CgroupManager.java   | 177 +++++++++++++
 .../storm/container/cgroup/CgroupOperation.java |  46 ++++
 .../storm/container/cgroup/CgroupUtils.java     | 133 ++++++++++
 .../storm/container/cgroup/Constants.java       |  30 +++
 .../apache/storm/container/cgroup/Device.java   |  72 ++++++
 .../storm/container/cgroup/Hierarchy.java       | 117 +++++++++
 .../storm/container/cgroup/SubSystem.java       |  78 ++++++
 .../storm/container/cgroup/SubSystemType.java   |  58 +++++
 .../storm/container/cgroup/SystemOperation.java |  65 +++++
 .../storm/container/cgroup/core/BlkioCore.java  | 259 +++++++++++++++++++
 .../storm/container/cgroup/core/CgroupCore.java |  26 ++
 .../storm/container/cgroup/core/CpuCore.java    | 136 ++++++++++
 .../container/cgroup/core/CpuacctCore.java      |  72 ++++++
 .../storm/container/cgroup/core/CpusetCore.java | 212 +++++++++++++++
 .../container/cgroup/core/DevicesCore.java      | 186 +++++++++++++
 .../container/cgroup/core/FreezerCore.java      |  67 +++++
 .../storm/container/cgroup/core/MemoryCore.java | 189 ++++++++++++++
 .../storm/container/cgroup/core/NetClsCore.java |  70 +++++
 .../container/cgroup/core/NetPrioCore.java      |  66 +++++
 .../src/jvm/org/apache/storm/utils/Utils.java   |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  16 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  | 118 +++++++++
 31 files changed, 3017 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/conf/cgconfig.conf.example
----------------------------------------------------------------------
diff --git a/conf/cgconfig.conf.example b/conf/cgconfig.conf.example
new file mode 100644
index 0000000..555b83a
--- /dev/null
+++ b/conf/cgconfig.conf.example
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+mount {
+       cpuset  = /cgroup/cpuset;
+       cpu     = /cgroup/storm_resources;
+       cpuacct = /cgroup/cpuacct;
+       memory  = /cgroup/storm_resources;
+       devices = /cgroup/devices;
+       freezer = /cgroup/freezer;
+       net_cls = /cgroup/net_cls;
+       blkio   = /cgroup/blkio;
+}
+
+group storm {
+       perm {
+               task {
+                      uid = 500;
+                      gid = 500;
+               }
+               admin {
+                      uid = 500;
+                      gid = 500;
+               }
+       }
+       cpu {
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d381f0d..e32e6f7 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -285,3 +285,16 @@ pacemaker.kerberos.users: []
 #default storm daemon metrics reporter plugins
 storm.daemon.metrics.reporter.plugins:
      - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
+
+storm.resource.isolation.plugin: 
"org.apache.storm.container.cgroup.CgroupManager"
+
+# Configs for CGroup support
+storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
+storm.cgroup.resources:
+    - cpu
+    - memory
+storm.cgroup.hierarchy.name: "storm"
+# Also determines whether the unit tests for cgroup runs.  If cgroup.enable is 
set to false the unit tests for cgroups will not run
+storm.cgroup.enable: false
+storm.supervisor.cgroup.rootdir: "storm"
+storm.cgroup.cgexec.cmd: "/bin/cgexec"

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index ae9e92f..97f2825 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -43,7 +43,9 @@
   (:require [metrics.gauges :refer [defgauge]])
   (:require [metrics.meters :refer [defmeter mark!]])
   (:gen-class
-    :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] 
void]]))
+    :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] 
void]])
+  (:import [org.apache.storm.container.cgroup CgroupManager])
+  (:require [clojure.string :as str]))
 
 (defmeter supervisor:num-workers-launched)
 
@@ -307,7 +309,9 @@
             (log-debug "Removing path " path)
             (.delete (File. path))
             (catch Exception e))))) ;; on windows, the supervisor may still 
holds the lock on the worker directory
-    (try-cleanup-worker conf id))
+    (try-cleanup-worker conf id)
+    (if (conf STORM-CGROUP-ENABLE)
+      (.shutDownWorker (:cgroup-manager supervisor) id false)))
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
 
 (def SUPERVISOR-ZK-ACLS
@@ -350,6 +354,12 @@
    :sync-retry (atom 0)
    :download-lock (Object.)
    :stormid->profiler-actions (atom {})
+   :cgroup-manager (if (conf STORM-CGROUP-ENABLE)
+                     (let [cgroup-manager (.newInstance (Class/forName (conf 
STORM-RESOURCE-ISOLATION-PLUGIN)))]
+                       (.prepare cgroup-manager conf)
+                       (log-message "Using resource isolation plugin " (conf 
STORM-RESOURCE-ISOLATION-PLUGIN))
+                       cgroup-manager)
+                     nil)
    })
 
 (defn required-topo-files-exist?
@@ -388,7 +398,7 @@
                 (:storm-id assignment)
                 port
                 id
-                mem-onheap)
+                resources)
               [id port])
             (do
               (log-message "Missing topology storm code, so can't launch 
worker with assignment "
@@ -1054,7 +1064,7 @@
       (Utils/createSymlink worker-dir topo-dir "artifacts" (str port)))))
 
 (defmethod launch-worker
-    :distributed [supervisor storm-id port worker-id mem-onheap]
+    :distributed [supervisor storm-id port worker-id resources]
     (let [conf (:conf supervisor)
           run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
           storm-home (System/getProperty "storm.home")
@@ -1078,9 +1088,15 @@
                         (Utils/addToClasspath [stormjar])
                         (Utils/addToClasspath topo-classpath))
           top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
-          mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not 
zero
-                       (int (Math/ceil mem-onheap)) ;; round up
+          mem-onheap (if (and (.get_mem_on_heap resources) (> 
(.get_mem_on_heap resources) 0)) ;; not nil and not zero
+                       (int (Math/ceil (.get_mem_on_heap resources))) ;; round 
up
                        (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use 
default value
+          mem-offheap (if (.get_mem_off_heap resources)
+                        (int (Math/ceil (.get_mem_off_heap resources))) ;; 
round up
+                        0)
+
+          cpu (int (Math/ceil (.get_cpu resources)))
+
           gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf 
WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap)
           topo-worker-logwriter-childopts (storm-conf 
TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
           user (storm-conf TOPOLOGY-SUBMITTER-USER)
@@ -1104,8 +1120,26 @@
                                             (str "file:///" 
storm-log4j2-conf-dir))
                                           storm-log4j2-conf-dir)
                                      Utils/FILE_PATH_SEPARATOR "worker.xml")
+
+          cgroup-command (if (conf STORM-CGROUP-ENABLE)
+                           (str/split
+                             (.startNewWorker (:cgroup-manager supervisor) 
worker-id
+                               (merge
+                                 ;; The manually set CGROUP-WORKER-CPU-LIMIT 
config on supervisor will overwrite resources assigned by RAS (Resource Aware 
Scheduler)
+                                 (cond
+                                   (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT) 
{"memory" (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT)}
+                                   (+ mem-onheap mem-offheap) {"memory" (+ 
mem-onheap mem-offheap)}
+                                   :else nil)
+                                 ;; The manually set CGROUP-WORKER-CPU-LIMIT 
config on supervisor will overwrite resources assigned by RAS (Resource Aware 
Scheduler)
+                                 (cond
+                                   (conf STORM-WORKER-CGROUP-CPU-LIMIT) {"cpu" 
(conf STORM-WORKER-CGROUP-CPU-LIMIT)}
+                                   (not= cpu nil) {"cpu" cpu}
+                                   :else nil))) #" "))
+
           command (concat
-                    [(java-cmd) "-cp" classpath 
+                    (if (conf STORM-CGROUP-ENABLE)
+                      cgroup-command)
+                    [(java-cmd) "-cp" classpath
                      topo-worker-logwriter-childopts
                      (str "-Dlogfile.name=" logfilename)
                      (str "-Dstorm.home=" storm-home)
@@ -1200,7 +1234,7 @@
           (FileUtils/copyDirectory (File. (.getFile url)) (File. 
target-dir)))))))
 
 (defmethod launch-worker
-    :local [supervisor storm-id port worker-id mem-onheap]
+    :local [supervisor storm-id port worker-id resources]
     (let [conf (:conf supervisor)
           pid (Utils/uuid)
           worker (worker/mk-worker conf

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 74231a0..a5c1ea0 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm;
 
+import org.apache.storm.container.ResourceIsolationInterface;
 import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -2194,6 +2195,63 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final Object CLIENT_JAR_TRANSFORMER = 
"client.jartransformer.class";
 
+
+    @isImplementationOfClass(implementsClass = 
ResourceIsolationInterface.class)
+    public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = 
"storm.resource.isolation.plugin";
+
+    /**
+     * CGroup Setting below
+     */
+
+    /**
+     * root directory of the storm cgroup hierarchy
+     */
+    @isString
+    public static final Object STORM_CGROUP_HIERARCHY_DIR = 
"storm.cgroup.hierarchy.dir";
+
+    /**
+     * resources to to be controlled by cgroups
+     */
+    @isStringList
+    public static final Object STORM_CGROUP_RESOURCES = 
"storm.cgroup.resources";
+
+    /**
+     * name for the cgroup hierarchy
+     */
+    @isString
+    public static final Object STORM_CGROUP_HIERARCHY_NAME = 
"storm.cgroup.hierarchy.name";
+
+    /**
+     * flag to determine whether to use cgroups
+     */
+    @isBoolean
+    public static final String STORM_CGROUP_ENABLE = "storm.cgroup.enable";
+
+    /**
+     * root directory for cgoups
+     */
+    @isString
+    public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = 
"storm.supervisor.cgroup.rootdir";
+
+    /**
+     * the manually set memory limit (in MB) for each CGroup on supervisor node
+     */
+    @isPositiveNumber
+    public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = 
"storm.worker.cgroup.memory.mb.limit";
+
+    /**
+     * the manually set cpu share for each CGroup on supervisor node
+     */
+    @isPositiveNumber
+    public static String STORM_WORKER_CGROUP_CPU_LIMIT = 
"storm.worker.cgroup.cpu.limit";
+
+    /**
+     * full path to cgexec command
+     */
+    @isString
+    public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";
+
+
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }
@@ -2406,4 +2464,24 @@ public class Config extends HashMap<String, Object> {
             this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
         }
     }
+
+    public static String getCgroupRootDir(Map conf) {
+        return (String) conf.get(STORM_SUPERVISOR_CGROUP_ROOTDIR);
+    }
+
+    public static String getCgroupStormHierarchyDir(Map conf) {
+        return (String) conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
+    }
+
+    public static ArrayList<String> getCgroupStormResources(Map conf) {
+        ArrayList<String> ret = new ArrayList<String>();
+        for (String entry : ((Iterable<String>) 
conf.get(Config.STORM_CGROUP_RESOURCES))) {
+            ret.add(entry);
+        }
+        return ret;
+    }
+
+    public static String getCgroupStormHierarchyName(Map conf) {
+        return (String) conf.get(Config.STORM_CGROUP_HIERARCHY_NAME);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java 
b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
new file mode 100644
index 0000000..8e52bc7
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container;
+
+import java.util.Map;
+
+/**
+ * A plugin to support resource isolation and limitation within Storm
+ */
+public interface ResourceIsolationInterface {
+
+    /**
+     * @param workerId worker id of the worker to start
+     * @param resources set of resources to limit
+     * @return a String that includes to command on how to start the worker.  
The string returned from this function
+     * will be concatenated to the front of the command to launch 
logwriter/worker in supervisor.clj
+     */
+    public String startNewWorker(String workerId, Map resources);
+
+    /**
+     * This function will be called when the worker needs to shutdown.  This 
function should include logic to clean up after a worker is shutdown
+     * @param workerId worker id to shutdown and clean up after
+     * @param isKilled whether to actually kill worker
+     */
+    public void shutDownWorker(String workerId, boolean isKilled);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
new file mode 100644
index 0000000..f7e7f69
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCenter implements CgroupOperation {
+
+    private static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
+
+    private static CgroupCenter instance;
+
+    private CgroupCenter() {
+
+    }
+
+    /**
+     * Thread unsafe
+     * 
+     * @return
+     */
+    public synchronized static CgroupCenter getInstance() {
+        if (instance == null) {
+            instance = new CgroupCenter();
+        }
+        return CgroupUtils.enabled() ? instance : null;
+    }
+
+    @Override
+    public List<Hierarchy> getHierarchies() {
+
+        Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
+
+        try (FileReader reader = new FileReader(Constants.MOUNT_STATUS_FILE);
+             BufferedReader br = new BufferedReader(reader)) {
+            String str = null;
+            while ((str = br.readLine()) != null) {
+                String[] strSplit = str.split(" ");
+                if (!strSplit[2].equals("cgroup")) {
+                    continue;
+                }
+                String name = strSplit[0];
+                String type = strSplit[3];
+                String dir = strSplit[1];
+                Hierarchy h = hierarchies.get(type);
+                h = new Hierarchy(name, CgroupUtils.analyse(type), dir);
+                hierarchies.put(type, h);
+            }
+            return new ArrayList<Hierarchy>(hierarchies.values());
+        } catch (Exception e) {
+            LOG.error("Get hierarchies error {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public Set<SubSystem> getSubSystems() {
+
+        Set<SubSystem> subSystems = new HashSet<SubSystem>();
+
+        try (FileReader reader = new FileReader(Constants.CGROUP_STATUS_FILE);
+             BufferedReader br = new BufferedReader(reader)){
+            String str = null;
+            while ((str = br.readLine()) != null) {
+                String[] split = str.split("\t");
+                SubSystemType type = SubSystemType.getSubSystem(split[0]);
+                if (type == null) {
+                    continue;
+                }
+                subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), 
Integer.valueOf(split[2])
+                        , Integer.valueOf(split[3]).intValue() == 1 ? true : 
false));
+            }
+            return subSystems;
+        } catch (Exception e) {
+            LOG.error("Get subSystems error {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean enabled(SubSystemType subsystem) {
+
+        Set<SubSystem> subSystems = this.getSubSystems();
+        for (SubSystem subSystem : subSystems) {
+            if (subSystem.getType() == subsystem) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Hierarchy busy(SubSystemType subsystem) {
+        List<Hierarchy> hierarchies = this.getHierarchies();
+        for (Hierarchy hierarchy : hierarchies) {
+            for (SubSystemType type : hierarchy.getSubSystems()) {
+                if (type == subsystem) {
+                    return hierarchy;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Hierarchy busy(List<SubSystemType> subSystems) {
+        List<Hierarchy> hierarchies = this.getHierarchies();
+        for (Hierarchy hierarchy : hierarchies) {
+            Hierarchy ret = hierarchy;
+            for (SubSystemType subsystem : subSystems) {
+                if (!hierarchy.getSubSystems().contains(subsystem)) {
+                    ret = null;
+                    break;
+                }
+            }
+            if (ret != null) {
+                return ret;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Hierarchy mounted(Hierarchy hierarchy) {
+
+        List<Hierarchy> hierarchies = this.getHierarchies();
+        if (CgroupUtils.dirExists(hierarchy.getDir())) {
+            for (Hierarchy h : hierarchies) {
+                if (h.equals(hierarchy)) {
+                    return h;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void mount(Hierarchy hierarchy) throws IOException {
+
+        if (this.mounted(hierarchy) != null) {
+            LOG.error("{} is mounted", hierarchy.getDir());
+            return;
+        }
+        Set<SubSystemType> subsystems = hierarchy.getSubSystems();
+        for (SubSystemType type : subsystems) {
+            if (this.busy(type) != null) {
+                LOG.error("subsystem: {} is busy", type.name());
+                subsystems.remove(type);
+            }
+        }
+        if (subsystems.size() == 0) {
+            return;
+        }
+        if (!CgroupUtils.dirExists(hierarchy.getDir())) {
+            new File(hierarchy.getDir()).mkdirs();
+        }
+        String subSystems = CgroupUtils.reAnalyse(subsystems);
+        SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", 
subSystems);
+
+    }
+
+    @Override
+    public void umount(Hierarchy hierarchy) throws IOException {
+        if (this.mounted(hierarchy) != null) {
+            hierarchy.getRootCgroups().delete();
+            SystemOperation.umount(hierarchy.getDir());
+            CgroupUtils.deleteDir(hierarchy.getDir());
+        }
+    }
+
+    @Override
+    public void create(CgroupCommon cgroup) throws SecurityException {
+        if (cgroup.isRoot()) {
+            LOG.error("You can't create rootCgroup in this function");
+            return;
+        }
+        CgroupCommon parent = cgroup.getParent();
+        while (parent != null) {
+            if (!CgroupUtils.dirExists(parent.getDir())) {
+                LOG.error(" {} is not existed", parent.getDir());
+                return;
+            }
+            parent = parent.getParent();
+        }
+        Hierarchy h = cgroup.getHierarchy();
+        if (mounted(h) == null) {
+            LOG.error("{} is not mounted", h.getDir());
+            return;
+        }
+        if (CgroupUtils.dirExists(cgroup.getDir())) {
+            LOG.error("{} is existed", cgroup.getDir());
+            return;
+        }
+
+        //Todo perhaps thrown exception or print out error message is dir is 
not created successfully
+        if (!(new File(cgroup.getDir())).mkdir()) {
+            LOG.error("Could not create cgroup dir at {}", cgroup.getDir());
+        }
+    }
+
+    @Override
+    public void delete(CgroupCommon cgroup) throws IOException {
+        cgroup.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
new file mode 100755
index 0000000..fbf96ba
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCommon implements CgroupCommonOperation {
+
+    public static final String TASKS = "/tasks";
+    public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
+    public static final String RELEASE_AGENT = "/release_agent";
+    public static final String CGROUP_CLONE_CHILDREN = 
"/cgroup.clone_children";
+    public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
+    public static final String CGROUP_PROCS = "/cgroup.procs";
+
+    private final Hierarchy hierarchy;
+
+    private final String name;
+
+    private final String dir;
+
+    private final CgroupCommon parent;
+
+    private final Map<SubSystemType, CgroupCore> cores;
+
+    private final boolean isRoot;
+
+    private final Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CgroupCommon.class);
+
+    public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) 
{
+        this.name = parent.getName() + "/" + name;
+        this.hierarchy = hierarchy;
+        this.parent = parent;
+        this.dir = parent.getDir() + "/" + name;
+        this.init();
+        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), 
this.dir);
+        this.isRoot = false;
+    }
+
+    /**
+     * rootCgroup
+     */
+    public CgroupCommon(Hierarchy hierarchy, String dir) {
+        this.name = "";
+        this.hierarchy = hierarchy;
+        this.parent = null;
+        this.dir = dir;
+        this.init();
+        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), 
this.dir);
+        this.isRoot = true;
+    }
+
+    @Override
+    public void addTask(int taskId) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), 
String.valueOf(taskId));
+    }
+
+    @Override
+    public Set<Integer> getTasks() throws IOException {
+        List<String> stringTasks = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+        Set<Integer> tasks = new HashSet<Integer>();
+        for (String task : stringTasks) {
+            tasks.add(Integer.valueOf(task));
+        }
+        return tasks;
+    }
+
+    @Override
+    public void addProcs(int pid) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), 
String.valueOf(pid));
+    }
+
+    @Override
+    public Set<Integer> getPids() throws IOException {
+        List<String> stringPids = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
+        Set<Integer> pids = new HashSet<Integer>();
+        for (String task : stringPids) {
+            pids.add(Integer.valueOf(task));
+        }
+        return pids;
+    }
+
+    @Override
+    public void setNotifyOnRelease(boolean flag) throws IOException {
+
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
NOTIFY_ON_RELEASE), flag ? "1" : "0");
+    }
+
+    @Override
+    public boolean getNotifyOnRelease() throws IOException {
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+    }
+
+    @Override
+    public void setReleaseAgent(String command) throws IOException {
+        if (!this.isRoot) {
+            return;
+        }
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), 
command);
+    }
+
+    @Override
+    public String getReleaseAgent() throws IOException {
+        if (!this.isRoot) {
+            return null;
+        }
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
RELEASE_AGENT)).get(0);
+    }
+
+    @Override
+    public void setCgroupCloneChildren(boolean flag) throws IOException {
+        if (!this.cores.keySet().contains(SubSystemType.cpuset)) {
+            return;
+        }
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+    }
+
+    @Override
+    public boolean getCgroupCloneChildren() throws IOException {
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+    }
+
+    @Override
+    public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(eventFd);
+        sb.append(' ');
+        sb.append(controlFd);
+        for (String arg : args) {
+            sb.append(' ');
+            sb.append(arg);
+        }
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_EVENT_CONTROL), sb.toString());
+    }
+
+    public Hierarchy getHierarchy() {
+        return hierarchy;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDir() {
+        return dir;
+    }
+
+    public CgroupCommon getParent() {
+        return parent;
+    }
+
+    public Set<CgroupCommon> getChildren() {
+        return children;
+    }
+
+    public boolean isRoot() {
+        return isRoot;
+    }
+
+    public Map<SubSystemType, CgroupCore> getCores() {
+        return cores;
+    }
+
+    public void delete() throws IOException {
+        this.free();
+        if (!this.isRoot) {
+            this.parent.getChildren().remove(this);
+        }
+    }
+
+    private void free() throws IOException {
+        for (CgroupCommon child : this.children) {
+            child.free();
+        }
+        if (this.isRoot) {
+            return;
+        }
+        Set<Integer> tasks = this.getTasks();
+        if (tasks != null) {
+            for (Integer task : tasks) {
+                this.parent.addTask(task);
+            }
+        }
+        CgroupUtils.deleteDir(this.dir);
+    }
+
+    private void init() {
+        File file = new File(this.dir);
+        File[] files = file.listFiles();
+        if (files == null) {
+            return;
+        }
+        for (File child : files) {
+            if (child.isDirectory()) {
+                this.children.add(new CgroupCommon(child.getName(), 
this.hierarchy, this));
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
new file mode 100755
index 0000000..f6b4ece
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface CgroupCommonOperation {
+
+    /**
+     * add task to cgroup
+     * @param taskid task id of task to add
+     */
+    public void addTask(int taskid) throws IOException;
+
+    /**
+     * Get a list of task ids running in CGroup
+     */
+    public Set<Integer> getTasks() throws IOException;
+
+    /**
+     * add a process to cgroup
+     * @param pid the PID of the process to add
+     */
+    public void addProcs(int pid) throws IOException;
+
+    /**
+     * get the PIDs of processes running in cgroup
+     */
+    public Set<Integer> getPids() throws IOException;
+
+    /**
+     * to set notify_on_release config in cgroup
+     */
+    public void setNotifyOnRelease(boolean flag) throws IOException;
+
+    /**
+     * to get the notify_on_release config
+     */
+    public boolean getNotifyOnRelease() throws IOException;
+
+    /**
+     * set a command for the release agent to execute
+     */
+    public void setReleaseAgent(String command) throws IOException;
+
+    /**
+     * get the command for the relase agent to execute
+     */
+    public String getReleaseAgent() throws IOException;
+
+    /**
+     * Set the cgroup.clone_children config
+     */
+    public void setCgroupCloneChildren(boolean flag) throws IOException;
+
+    /**
+     * get the cgroup.clone_children config
+     */
+    public boolean getCgroupCloneChildren() throws IOException;
+
+    /**
+     * set event control config
+     */
+    public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
new file mode 100755
index 0000000..98aedcf
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.BlkioCore;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.CpuacctCore;
+import org.apache.storm.container.cgroup.core.CpusetCore;
+import org.apache.storm.container.cgroup.core.DevicesCore;
+import org.apache.storm.container.cgroup.core.FreezerCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.container.cgroup.core.NetClsCore;
+import org.apache.storm.container.cgroup.core.NetPrioCore;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCoreFactory {
+
+    public static Map<SubSystemType, CgroupCore> 
getInstance(Set<SubSystemType> types, String dir) {
+        Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, 
CgroupCore>();
+        for (SubSystemType type : types) {
+            switch (type) {
+            case blkio:
+                result.put(SubSystemType.blkio, new BlkioCore(dir));
+                break;
+            case cpuacct:
+                result.put(SubSystemType.cpuacct, new CpuacctCore(dir));
+                break;
+            case cpuset:
+                result.put(SubSystemType.cpuset, new CpusetCore(dir));
+                break;
+            case cpu:
+                result.put(SubSystemType.cpu, new CpuCore(dir));
+                break;
+            case devices:
+                result.put(SubSystemType.devices, new DevicesCore(dir));
+                break;
+            case freezer:
+                result.put(SubSystemType.freezer, new FreezerCore(dir));
+                break;
+            case memory:
+                result.put(SubSystemType.memory, new MemoryCore(dir));
+                break;
+            case net_cls:
+                result.put(SubSystemType.net_cls, new NetClsCore(dir));
+                break;
+            case net_prio:
+                result.put(SubSystemType.net_prio, new NetPrioCore(dir));
+                break;
+            default:
+                break;
+            }
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
new file mode 100644
index 0000000..a3dbd9d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupManager implements ResourceIsolationInterface {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CgroupManager.class);
+
+    private CgroupCenter center;
+
+    private Hierarchy hierarchy;
+
+    private CgroupCommon rootCgroup;
+
+    private static String rootDir;
+
+    private Map conf;
+
+    public void prepare(Map conf) throws IOException {
+        this.conf = conf;
+        this.rootDir = Config.getCgroupRootDir(this.conf);
+        if (this.rootDir == null) {
+            throw new RuntimeException("Check configuration file. The 
supervisor.cgroup.rootdir is missing.");
+        }
+
+        File file = new File(Config.getCgroupStormHierarchyDir(conf) + "/" + 
this.rootDir);
+        if (!file.exists()) {
+            LOG.error("{}/{} is not existing.", 
Config.getCgroupStormHierarchyDir(conf), this.rootDir);
+            throw new RuntimeException("Check if cgconfig service starts or 
/etc/cgconfig.conf is consistent with configuration file.");
+        }
+        this.center = CgroupCenter.getInstance();
+        if (this.center == null) {
+            throw new RuntimeException("Cgroup error, please check 
/proc/cgroups");
+        }
+        this.prepareSubSystem(this.conf);
+    }
+
+    /**
+     * User cfs_period & cfs_quota to control the upper limit use of cpu core 
e.g.
+     * If making a process to fully use two cpu cores, set cfs_period_us to
+     * 100000 and set cfs_quota_us to 200000
+     */
+    private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) 
throws IOException {
+
+        if (cpuCoreUpperLimit == -1) {
+            // No control of cpu usage
+            cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit);
+        } else {
+            cpuCore.setCpuCfsPeriodUs(100000);
+            cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit * 1000);
+        }
+    }
+
+    public String startNewWorker(String workerId, Map resourcesMap) throws 
SecurityException {
+        Number cpuNum = (Number) resourcesMap.get("cpu");
+        Number totalMem = null;
+        if (resourcesMap.get("memory") != null) {
+            totalMem = (Number) resourcesMap.get("memory");
+        }
+
+        CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, 
this.rootCgroup);
+        this.center.create(workerGroup);
+
+        if (cpuNum != null) {
+            CpuCore cpuCore = (CpuCore) 
workerGroup.getCores().get(SubSystemType.cpu);
+            try {
+                cpuCore.setCpuShares(cpuNum.intValue());
+            } catch (IOException e) {
+                throw new RuntimeException("Cannot set cpu.shares! Exception: 
" + e);
+            }
+        }
+
+        if (totalMem != null) {
+            MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
+            try {
+                
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
+            } catch (IOException e) {
+                throw new RuntimeException("Cannot set memory.limit_in_bytes! 
Exception: " + e);
+            }
+        }
+
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g 
");
+
+        Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator();
+        while(it.hasNext()) {
+            sb.append(it.next().toString());
+            if(it.hasNext()) {
+                sb.append(",");
+            } else {
+                sb.append(":");
+            }
+        }
+
+        sb.append(workerGroup.getName());
+
+        return sb.toString();
+    }
+
+    public void shutDownWorker(String workerId, boolean isKilled) {
+        CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, 
this.rootCgroup);
+        try {
+            if (isKilled == false) {
+                for (Integer pid : workerGroup.getTasks()) {
+                    Utils.kill(pid);
+                }
+                Utils.sleepMs(1500);
+            }
+            Set<Integer> tasks = workerGroup.getTasks();
+            if (isKilled == true && !tasks.isEmpty()) {
+                throw new Exception("Cannot correctly showdown worker CGroup " 
+ workerId + "tasks " + tasks.toString() + " still running!");
+            }
+            this.center.delete(workerGroup);
+        } catch (Exception e) {
+            LOG.error("Exception thrown when shutting worker {} Exception: 
{}", workerId, e);
+        }
+    }
+
+    public void close() throws IOException {
+        this.center.delete(this.rootCgroup);
+    }
+
+    private void prepareSubSystem(Map conf) throws IOException {
+        List<SubSystemType> subSystemTypes = new LinkedList<>();
+        for (String resource : Config.getCgroupStormResources(conf)) {
+            subSystemTypes.add(SubSystemType.getSubSystem(resource));
+        }
+
+        this.hierarchy = center.busy(subSystemTypes);
+
+        if (this.hierarchy == null) {
+            Set<SubSystemType> types = new HashSet<SubSystemType>();
+            types.add(SubSystemType.cpu);
+            this.hierarchy = new 
Hierarchy(Config.getCgroupStormHierarchyName(conf), types, 
Config.getCgroupStormHierarchyDir(conf));
+        }
+        this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, 
this.hierarchy.getRootCgroups());
+
+        // set upper limit to how much cpu can be used by all workers running 
on supervisor node.
+        // This is done so that some cpu cycles will remain free to run the 
daemons and other miscellaneous OS operations.
+        CpuCore supervisorRootCPU = (CpuCore)  
this.rootCgroup.getCores().get(SubSystemType.cpu);
+        setCpuUsageUpperLimit(supervisorRootCPU, ((Number) 
this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
new file mode 100755
index 0000000..aa315ba
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public interface CgroupOperation {
+
+    public List<Hierarchy> getHierarchies();
+
+    public Set<SubSystem> getSubSystems();
+
+    public boolean enabled(SubSystemType subsystem);
+
+    public Hierarchy busy(SubSystemType subsystem);
+
+    public Hierarchy busy(List<SubSystemType> subSystems);
+
+    public Hierarchy mounted(Hierarchy hierarchy);
+
+    public void mount(Hierarchy hierarchy) throws IOException;
+
+    public void umount(Hierarchy hierarchy) throws IOException;
+
+    public void create(CgroupCommon cgroup) throws SecurityException;
+
+    public void delete(CgroupCommon cgroup) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
new file mode 100644
index 0000000..7c88f5d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CgroupUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CgroupUtils.class);
+
+    public static void deleteDir(String dir) {
+        File d = new File(dir);
+        if (d.exists()) {
+            if (d.isDirectory()) {
+                if (!d.delete()) {
+                    throw new RuntimeException("Cannot delete dir " + dir);
+                }
+            } else {
+                throw new RuntimeException("dir " + dir + " is not a 
directory!");
+            }
+        } else {
+            LOG.warn("dir {} does not exist!", dir);
+        }
+    }
+
+    public static boolean fileExists(String dir) {
+        File file = new File(dir);
+        return file.exists();
+    }
+
+    public static boolean dirExists(String dir) {
+        File file = new File(dir);
+        return file.isDirectory();
+    }
+
+    public static Set<SubSystemType> analyse(String str) {
+        Set<SubSystemType> result = new HashSet<SubSystemType>();
+        String[] subSystems = str.split(",");
+        for (String subSystem : subSystems) {
+            SubSystemType type = SubSystemType.getSubSystem(subSystem);
+            if (type != null) {
+                result.add(type);
+            }
+        }
+        return result;
+    }
+
+    public static String reAnalyse(Set<SubSystemType> subSystems) {
+        StringBuilder sb = new StringBuilder();
+        if (subSystems.size() == 0) {
+            return sb.toString();
+        }
+        for (SubSystemType type : subSystems) {
+            sb.append(type.name()).append(",");
+        }
+        return sb.toString().substring(0, sb.length() - 1);
+    }
+
+    public static boolean enabled() {
+        return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
+    }
+
+    public static List<String> readFileByLine(String fileDir) throws 
IOException {
+        List<String> result = new ArrayList<String>();
+        File file = new File(fileDir);
+        try (FileReader fileReader = new FileReader(file);
+             BufferedReader reader = new BufferedReader(fileReader)) {
+            String tempString = null;
+            while ((tempString = reader.readLine()) != null) {
+                result.add(tempString);
+            }
+        }
+        return result;
+    }
+
+    public static void writeFileByLine(String fileDir, List<String> strings) 
throws IOException {
+        File file = new File(fileDir);
+        if (!file.exists()) {
+            LOG.error("{} is no existed", fileDir);
+            return;
+        }
+        try (FileWriter writer = new FileWriter(file, true);
+             BufferedWriter bw = new BufferedWriter(writer)) {
+            for (String string : strings) {
+                bw.write(string);
+                bw.newLine();
+                bw.flush();
+            }
+        }
+    }
+
+    public static void writeFileByLine(String fileDir, String string) throws 
IOException {
+        LOG.debug("For CGroups - writing {} to {} ", string, fileDir);
+        File file = new File(fileDir);
+        if (!file.exists()) {
+            LOG.error("{} is no existed", fileDir);
+            return;
+        }
+        try (FileWriter writer = new FileWriter(file, true);
+             BufferedWriter bw = new BufferedWriter(writer)) {
+            bw.write(string);
+            bw.newLine();
+            bw.flush();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
new file mode 100755
index 0000000..0ce9643
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+public class Constants {
+
+    public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+
+    public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
+    public static String getDir(String dir, String constant) {
+        return dir + constant;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
new file mode 100755
index 0000000..26def4c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+public class Device {
+
+    public final int major;
+    public final int minor;
+
+    public Device(int major, int minor) {
+        this.major = major;
+        this.minor = minor;
+    }
+
+    public Device(String str) {
+        String[] strArgs = str.split(":");
+        this.major = Integer.valueOf(strArgs[0]);
+        this.minor = Integer.valueOf(strArgs[1]);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(major).append(":").append(minor);
+        return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + major;
+        result = prime * result + minor;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Device other = (Device) obj;
+        if (major != other.major) {
+            return false;
+        }
+        if (minor != other.minor) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
new file mode 100755
index 0000000..16df384
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.util.Set;
+
+public class Hierarchy {
+
+    private final String name;
+
+    private final Set<SubSystemType> subSystems;
+
+    private final String type;
+
+    private final String dir;
+
+    private final CgroupCommon rootCgroups;
+
+    public Hierarchy(String name, Set<SubSystemType> subSystems, String dir) {
+        this.name = name;
+        this.subSystems = subSystems;
+        this.dir = dir;
+        this.rootCgroups = new CgroupCommon(this, dir);
+        this.type = CgroupUtils.reAnalyse(subSystems);
+    }
+
+    public Set<SubSystemType> getSubSystems() {
+        return subSystems;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((dir == null) ? 0 : dir.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Hierarchy other = (Hierarchy) obj;
+        if (dir == null) {
+            if (other.dir != null) {
+                return false;
+            }
+        } else if (!dir.equals(other.dir)) {
+            return false;
+        }
+        if (name == null) {
+            if (other.name != null) {
+                return false;
+            }
+        } else if (!name.equals(other.name)) {
+            return false;
+        }
+        if (type == null) {
+            if (other.type != null) {
+                return false;
+            }
+        } else if (!type.equals(other.type)) {
+            return false;
+        }
+        return true;
+    }
+
+    public String getDir() {
+        return dir;
+    }
+
+    public CgroupCommon getRootCgroups() {
+        return rootCgroups;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean subSystemMounted(SubSystemType subsystem) {
+        for (SubSystemType type : this.subSystems) {
+            if (type == subsystem) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
new file mode 100755
index 0000000..ac62e61
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+public class SubSystem {
+
+    private SubSystemType type;
+
+    private int hierarchyID;
+
+    private int cgroupsNum;
+
+    private boolean enable;
+
+    public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, 
boolean enable) {
+        this.type = type;
+        this.hierarchyID = hierarchyID;
+        this.cgroupsNum = cgroupNum;
+        this.enable = enable;
+    }
+
+    public SubSystemType getType() {
+        return type;
+    }
+
+    public void setType(SubSystemType type) {
+        this.type = type;
+    }
+
+    public int getHierarchyID() {
+        return hierarchyID;
+    }
+
+    public void setHierarchyID(int hierarchyID) {
+        this.hierarchyID = hierarchyID;
+    }
+
+    public int getCgroupsNum() {
+        return cgroupsNum;
+    }
+
+    public void setCgroupsNum(int cgroupsNum) {
+        this.cgroupsNum = cgroupsNum;
+    }
+
+    public boolean isEnable() {
+        return enable;
+    }
+
+    public void setEnable(boolean enable) {
+        this.enable = enable;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        boolean ret = false;
+        if (object != null && object instanceof SubSystem) {
+            ret = (this.type.equals(((SubSystem)object).getType()) && 
this.hierarchyID == ((SubSystem)object).getHierarchyID());
+        }
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
new file mode 100755
index 0000000..3c6c020
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+public enum SubSystemType {
+
+    // net_cls,ns is not supposted in ubuntu
+    blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, 
net_cls, net_prio;
+
+    public static SubSystemType getSubSystem(String str) {
+        if (str.equals("blkio")) {
+            return blkio;
+        }
+        else if (str.equals("cpu")) {
+            return cpu;
+        }
+        else if (str.equals("cpuacct")) {
+            return cpuacct;
+        }
+        else if (str.equals("cpuset")) {
+            return cpuset;
+        }
+        else if (str.equals("devices")) {
+            return devices;
+        }
+        else if (str.equals("freezer")) {
+            return freezer;
+        }
+        else if (str.equals("memory")) {
+            return memory;
+        }
+        else if (str.equals("perf_event")) {
+            return perf_event;
+        }
+        else if (str.equals("net_cls")) {
+            return net_cls;
+        }
+        else if (str.equals("net_prio")) {
+            return net_prio;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
new file mode 100644
index 0000000..ee3517a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.cgroup;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SystemOperation {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SystemOperation.class);
+
+    public static boolean isRoot() throws IOException {
+        String result = SystemOperation.exec("echo $EUID").substring(0, 1);
+        return Integer.valueOf(result.substring(0, 
result.length())).intValue() == 0 ? true : false;
+    };
+
+    public static void mount(String name, String target, String type, String 
data) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append("mount -t ").append(type).append(" -o 
").append(data).append(" ").append(name).append(" ").append(target);
+        SystemOperation.exec(sb.toString());
+    }
+
+    public static void umount(String name) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append("umount ").append(name);
+        SystemOperation.exec(sb.toString());
+    }
+
+    public static String exec(String cmd) throws IOException {
+        LOG.debug("Shell cmd: {}", cmd);
+        Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", 
cmd }).start();
+        try {
+            process.waitFor();
+            String output = IOUtils.toString(process.getInputStream());
+            String errorOutput = IOUtils.toString(process.getErrorStream());
+            LOG.debug("Shell Output: {}", output);
+            if (errorOutput.length() != 0) {
+                LOG.error("Shell Error Output: {}", errorOutput);
+                throw new IOException(errorOutput);
+            }
+            return output;
+        } catch (InterruptedException ie) {
+            throw new IOException(ie.toString());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
new file mode 100755
index 0000000..5522601
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BlkioCore implements CgroupCore {
+
+    public static final String BLKIO_WEIGHT = "/blkio.weight";
+    public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device";
+    public static final String BLKIO_RESET_STATS = "/blkio.reset_stats";
+
+    public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = 
"/blkio.throttle.read_bps_device";
+    public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = 
"/blkio.throttle.write_bps_device";
+    public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = 
"/blkio.throttle.read_iops_device";
+    public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = 
"/blkio.throttle.write_iops_device";
+
+    public static final String BLKIO_THROTTLE_IO_SERVICED = 
"/blkio.throttle.io_serviced";
+    public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = 
"/blkio.throttle.io_service_bytes";
+
+    public static final String BLKIO_TIME = "/blkio.time";
+    public static final String BLKIO_SECTORS = "/blkio.sectors";
+    public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced";
+    public static final String BLKIO_IO_SERVICE_BYTES = 
"/blkio.io_service_bytes";
+    public static final String BLKIO_IO_SERVICE_TIME = 
"/blkio.io_service_time";
+    public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time";
+    public static final String BLKIO_IO_MERGED = "/blkio.io_merged";
+    public static final String BLKIO_IO_QUEUED = "/blkio.io_queued";
+
+    private final String dir;
+
+    public BlkioCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.blkio;
+    }
+
+    /* weight: 100-1000 */
+    public void setBlkioWeight(int weight) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), 
String.valueOf(weight));
+    }
+
+    public int getBlkioWeight() throws IOException {
+        return 
Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_WEIGHT)).get(0)).intValue();
+    }
+
+    public void setBlkioWeightDevice(Device device, int weight) throws 
IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
+    }
+
+    public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
+        Map<Device, Integer> result = new HashMap<Device, Integer>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Integer weight = Integer.valueOf(strArgs[1]);
+            result.put(device, weight);
+        }
+        return result;
+    }
+
+    public void setReadBps(Device device, long bps) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
+    }
+
+    public Map<Device, Long> getReadBps() throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_BPS_DEVICE));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long bps = Long.valueOf(strArgs[1]);
+            result.put(device, bps);
+        }
+        return result;
+    }
+
+    public void setWriteBps(Device device, long bps) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
+    }
+
+    public Map<Device, Long> getWriteBps() throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_BPS_DEVICE));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long bps = Long.valueOf(strArgs[1]);
+            result.put(device, bps);
+        }
+        return result;
+    }
+
+    public void setReadIOps(Device device, long iops) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
+    }
+
+    public Map<Device, Long> getReadIOps() throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_READ_IOPS_DEVICE));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long iops = Long.valueOf(strArgs[1]);
+            result.put(device, iops);
+        }
+        return result;
+    }
+
+    public void setWriteIOps(Device device, long iops) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
+    }
+
+    public Map<Device, Long> getWriteIOps() throws IOException {
+        List<String> strings = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long iops = Long.valueOf(strArgs[1]);
+            result.put(device, iops);
+        }
+        return result;
+    }
+
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws 
IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() 
throws IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+    }
+
+    public Map<Device, Long> getBlkioTime() throws IOException {
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME));
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
+        }
+        return result;
+    }
+
+    public Map<Device, Long> getBlkioSectors() throws IOException {
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS));
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
+        }
+        return result;
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiced() throws 
IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws 
IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICE_BYTES)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws 
IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_SERVICE_TIME)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws 
IOException {
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_WAIT_TIME)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException 
{
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_MERGED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException 
{
+        return 
this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
BLKIO_IO_QUEUED)));
+    }
+
+    public void resetStats() throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
BLKIO_RESET_STATS), "1");
+    }
+
+    private String makeContext(Device device, Object data) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(device.toString()).append(" ").append(data);
+        return sb.toString();
+    }
+
+    private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> 
strs) {
+        Map<Device, Map<RecordType, Long>> result = new HashMap<Device, 
Map<RecordType, Long>>();
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            if (strArgs.length != 3) {
+                continue;
+            }
+            Device device = new Device(strArgs[0]);
+            RecordType key = RecordType.getType(strArgs[1]);
+            Long value = Long.parseLong(strArgs[2]);
+            Map<RecordType, Long> record = result.get(device);
+            if (record == null) {
+                record = new HashMap<RecordType, Long>();
+                result.put(device, record);
+            }
+            record.put(key, value);
+        }
+        return result;
+    }
+
+    public enum RecordType {
+        read, write, sync, async, total;
+
+        public static RecordType getType(String type) {
+            if (type.equals("Read")) {
+                return read;
+            }
+            else if (type.equals("Write")) {
+                return write;
+            }
+            else if (type.equals("Sync")) {
+                return sync;
+            }
+            else if (type.equals("Async")) {
+                return async;
+            }
+            else if (type.equals("Total")) {
+                return total;
+            }
+            else {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
new file mode 100755
index 0000000..a6b098e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.SubSystemType;
+
+public interface CgroupCore {
+
+    public SubSystemType getType();
+
+}

Reply via email to