Repository: storm
Updated Branches:
  refs/heads/master 46999909a -> 10f9086a4


http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
new file mode 100755
index 0000000..054ec0d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CpuCore implements CgroupCore {
+
+    public static final String CPU_SHARES = "/cpu.shares";
+    public static final String CPU_RT_RUNTIME_US = "/cpu.rt_runtime_us";
+    public static final String CPU_RT_PERIOD_US = "/cpu.rt_period_us";
+    public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
+    public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
+    public static final String CPU_STAT = "/cpu.stat";
+
+    private final String dir;
+
+    public CpuCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpu;
+    }
+
+    public void setCpuShares(int weight) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), 
String.valueOf(weight));
+    }
+
+    public int getCpuShares() throws IOException {
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_SHARES)).get(0));
+    }
+
+    public void setCpuRtRuntimeUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_RT_RUNTIME_US), String.valueOf(us));
+    }
+
+    public long getCpuRtRuntimeUs() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_RT_RUNTIME_US)).get(0));
+    }
+
+    public void setCpuRtPeriodUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_RT_PERIOD_US), String.valueOf(us));
+    }
+
+    public Long getCpuRtPeriodUs() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_RT_PERIOD_US)).get(0));
+    }
+
+    public void setCpuCfsPeriodUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_CFS_PERIOD_US), String.valueOf(us));
+    }
+
+    public Long getCpuCfsPeriodUs() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_CFS_PERIOD_US)).get(0));
+    }
+
+    public void setCpuCfsQuotaUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPU_CFS_QUOTA_US), String.valueOf(us));
+    }
+
+    public Long getCpuCfsQuotaUs() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_CFS_QUOTA_US)).get(0));
+    }
+
+    public Stat getCpuStat() throws IOException {
+        return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPU_STAT)));
+    }
+
+    public static class Stat {
+        public final int nrPeriods;
+        public final int nrThrottled;
+        public final int throttledTime;
+
+        public Stat(List<String> statStr) {
+            this.nrPeriods = Integer.parseInt(statStr.get(0).split(" ")[1]);
+            this.nrThrottled = Integer.parseInt(statStr.get(1).split(" ")[1]);
+            this.throttledTime = Integer.parseInt(statStr.get(2).split(" 
")[1]);
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + nrPeriods;
+            result = prime * result + nrThrottled;
+            result = prime * result + throttledTime;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Stat other = (Stat) obj;
+            if (nrPeriods != other.nrPeriods) {
+                return false;
+            }
+            if (nrThrottled != other.nrThrottled) {
+                return false;
+            }
+            if (throttledTime != other.throttledTime) {
+                return false;
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
new file mode 100755
index 0000000..56ae2dc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CpuacctCore implements CgroupCore {
+
+    public static final String CPUACCT_USAGE = "/cpuacct.usage";
+    public static final String CPUACCT_STAT = "/cpuacct.stat";
+    public static final String CPUACCT_USAGE_PERCPU = "/cpuacct.usage_percpu";
+
+    private final String dir;
+
+    public CpuacctCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpuacct;
+    }
+
+    public Long getCpuUsage() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUACCT_USAGE)).get(0));
+    }
+
+    public Map<StatType, Long> getCpuStat() throws IOException {
+        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT));
+        Map<StatType, Long> result = new HashMap<StatType, Long>();
+        result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
+        result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
+        return result;
+    }
+
+    public Long[] getPerCpuUsage() throws IOException {
+        String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUACCT_USAGE_PERCPU)).get(0);
+        String[] strArgs = str.split(" ");
+        Long[] result = new Long[strArgs.length];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = Long.parseLong(strArgs[i]);
+        }
+        return result;
+    }
+
+    public enum StatType {
+        user, system;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
new file mode 100755
index 0000000..fdb9996
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+public class CpusetCore implements CgroupCore {
+
+    public static final String CPUSET_CPUS = "/cpuset.cpus";
+    public static final String CPUSET_MEMS = "/cpuset.mems";
+    public static final String CPUSET_MEMORY_MIGRATE = 
"/cpuset.memory_migrate";
+    public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive";
+    public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive";
+    public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall";
+    public static final String CPUSET_MEMORY_PRESSURE = 
"/cpuset.memory_pressure";
+    public static final String CPUSET_MEMORY_PRESSURE_ENABLED = 
"/cpuset.memory_pressure_enabled";
+    public static final String CPUSET_MEMORY_SPREAD_PAGE = 
"/cpuset.memory_spread_page";
+    public static final String CPUSET_MEMORY_SPREAD_SLAB = 
"/cpuset.memory_spread_slab";
+    public static final String CPUSET_SCHED_LOAD_BALANCE = 
"/cpuset.sched_load_balance";
+    public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = 
"/cpuset.sched_relax_domain_level";
+
+    private final String dir;
+
+    public CpusetCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpuset;
+    }
+
+    public void setCpus(int[] nums) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        for (int num : nums) {
+            sb.append(num);
+            sb.append(',');
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), 
sb.toString());
+    }
+
+    public int[] getCpus() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_CPUS)).get(0);
+        return parseNums(output);
+    }
+
+    public void setMems(int[] nums) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        for (int num : nums) {
+            sb.append(num);
+            sb.append(',');
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), 
sb.toString());
+    }
+
+    public int[] getMems() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMS)).get(0);
+        return parseNums(output);
+    }
+
+    public void setMemMigrate(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemMigrate() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_MIGRATE)).get(0));
+        return output > 0;
+    }
+
+    public void setCpuExclusive(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isCpuExclusive() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_CPU_EXCLUSIVE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemExclusive(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemExclusive() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEM_EXCLUSIVE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemHardwall(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemHardwall() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEM_HARDWALL)).get(0));
+        return output > 0;
+    }
+
+    public int getMemPressure() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_PRESSURE)).get(0);
+        return Integer.parseInt(output);
+    }
+
+    public void setMemPressureEnabled(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemPressureEnabled() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+        return output > 0;
+    }
+
+    public void setMemSpreadPage(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemSpreadPage() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_SPREAD_PAGE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemSpreadSlab(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemSpreadSlab() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_MEMORY_SPREAD_SLAB)).get(0));
+        return output > 0;
+    }
+
+    public void setSchedLoadBlance(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isSchedLoadBlance() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_SCHED_LOAD_BALANCE)).get(0));
+        return output > 0;
+    }
+
+    public void setSchedRelaxDomainLevel(int value) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
+    }
+
+    public int getSchedRelaxDomainLevel() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+        return Integer.parseInt(output);
+    }
+
+    public static int[] parseNums(String outputStr) {
+        char[] output = outputStr.toCharArray();
+        LinkedList<Integer> numList = new LinkedList<Integer>();
+        int value = 0;
+        int start = 0;
+        boolean isHyphen = false;
+        for (char ch : output) {
+            if (ch == ',') {
+                if (isHyphen) {
+                    for (; start <= value; start++) {
+                        numList.add(start);
+                    }
+                    isHyphen = false;
+                } else {
+                    numList.add(value);
+                }
+                value = 0;
+            } else if (ch == '-') {
+                isHyphen = true;
+                start = value;
+                value = 0;
+            } else {
+                value = value * 10 + (ch - '0');
+            }
+        }
+        if (output[output.length - 1] != ',') {
+            if (isHyphen) {
+                for (; start <= value; start++) {
+                    numList.add(start);
+                }
+            } else {
+                numList.add(value);
+            }
+        }
+
+        int[] nums = new int[numList.size()];
+        int index = 0;
+        for (int num : numList) {
+            nums[index] = num;
+            index++;
+        }
+        return nums;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
new file mode 100755
index 0000000..a6896c5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DevicesCore implements CgroupCore {
+
+    private final String dir;
+
+    public static final String DEVICES_ALLOW = "/devices.allow";
+    public static final String DEVICES_DENY = "/devices.deny";
+    public static final String DEVICES_LIST = "/devices.list";
+
+    public static final char TYPE_ALL = 'a';
+    public static final char TYPE_BLOCK = 'b';
+    public static final char TYPE_CHAR = 'c';
+
+    public static final int ACCESS_READ = 1;
+    public static final int ACCESS_WRITE = 2;
+    public static final int ACCESS_CREATE = 4;
+
+    public static final char ACCESS_READ_CH = 'r';
+    public static final char ACCESS_WRITE_CH = 'w';
+    public static final char ACCESS_CREATE_CH = 'm';
+
+    public DevicesCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.devices;
+    }
+
+    public static class Record {
+        Device device;
+        char type;
+        int accesses;
+
+        public Record(char type, Device device, int accesses) {
+            this.type = type;
+            this.device = device;
+            this.accesses = accesses;
+        }
+
+        public Record(String output) {
+            if (output.contains("*")) {
+                System.out.println("Pre:" + output);
+                output = output.replaceAll("\\*", "-1");
+                System.out.println("After:" + output);
+            }
+            String[] splits = output.split("[: ]");
+            type = splits[0].charAt(0);
+            int major = Integer.parseInt(splits[1]);
+            int minor = Integer.parseInt(splits[2]);
+            device = new Device(major, minor);
+            accesses = 0;
+            for (char c : splits[3].toCharArray()) {
+                if (c == ACCESS_READ_CH) {
+                    accesses |= ACCESS_READ;
+                }
+                if (c == ACCESS_CREATE_CH) {
+                    accesses |= ACCESS_CREATE;
+                }
+                if (c == ACCESS_WRITE_CH) {
+                    accesses |= ACCESS_WRITE;
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(type);
+            sb.append(' ');
+            sb.append(device.major);
+            sb.append(':');
+            sb.append(device.minor);
+            sb.append(' ');
+            sb.append(getAccessesFlag(accesses));
+
+            return sb.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + accesses;
+            result = prime * result + ((device == null) ? 0 : 
device.hashCode());
+            result = prime * result + type;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Record other = (Record) obj;
+            if (accesses != other.accesses) {
+                return false;
+            }
+            if (device == null) {
+                if (other.device != null) {
+                    return false;
+                }
+            } else if (!device.equals(other.device)) {
+                return false;
+            }
+            if (type != other.type) {
+                return false;
+            }
+            return true;
+        }
+
+        public static Record[] parseRecordList(List<String> output) {
+            Record[] records = new Record[output.size()];
+            for (int i = 0, l = output.size(); i < l; i++) {
+                records[i] = new Record(output.get(i));
+            }
+
+            return records;
+        }
+
+        public static StringBuilder getAccessesFlag(int accesses) {
+            StringBuilder sb = new StringBuilder();
+            if ((accesses & ACCESS_READ) != 0) {
+                sb.append(ACCESS_READ_CH);
+            }
+            if ((accesses & ACCESS_WRITE) != 0) {
+                sb.append(ACCESS_WRITE_CH);
+            }
+            if ((accesses & ACCESS_CREATE) != 0) {
+                sb.append(ACCESS_CREATE_CH);
+            }
+            return sb;
+        }
+    }
+
+    private void setPermission(String prop, char type, Device device, int 
accesses) throws IOException {
+        Record record = new Record(type, device, accesses);
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), 
record.toString());
+    }
+
+    public void setAllow(char type, Device device, int accesses) throws 
IOException {
+        setPermission(DEVICES_ALLOW, type, device, accesses);
+    }
+
+    public void setDeny(char type, Device device, int accesses) throws 
IOException {
+        setPermission(DEVICES_DENY, type, device, accesses);
+    }
+
+    public Record[] getList() throws IOException {
+        List<String> output = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST));
+        return Record.parseRecordList(output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
new file mode 100755
index 0000000..65b8989
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class FreezerCore implements CgroupCore {
+
+    public static final String FREEZER_STATE = "/freezer.state";
+
+    private final String dir;
+
+    public FreezerCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.freezer;
+    }
+
+    public void setState(State state) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), 
state.name().toUpperCase());
+    }
+
+    public State getState() throws IOException {
+        return 
State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
FREEZER_STATE)).get(0));
+    }
+
+    public enum State {
+        frozen, freezing, thawed;
+
+        public static State getStateValue(String state) {
+            if (state.equals("FROZEN")) {
+                return frozen;
+            }
+            else if (state.equals("FREEZING")) {
+                return freezing;
+            }
+            else if (state.equals("THAWED")) {
+                return thawed;
+            }
+            else {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
new file mode 100755
index 0000000..98be198
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class MemoryCore implements CgroupCore {
+
+    public static final String MEMORY_STAT = "/memory.stat";
+    public static final String MEMORY_USAGE_IN_BYTES = 
"/memory.usage_in_bytes";
+    public static final String MEMORY_MEMSW_USAGE_IN_BYTES = 
"/memory.memsw.usage_in_bytes";
+    public static final String MEMORY_MAX_USAGE_IN_BYTES = 
"/memory.max_usage_in_bytes";
+    public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = 
"/memory.memsw.max_usage_in_bytes";
+    public static final String MEMORY_LIMIT_IN_BYTES = 
"/memory.limit_in_bytes";
+    public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = 
"/memory.memsw.limit_in_bytes";
+    public static final String MEMORY_FAILCNT = "/memory.failcnt";
+    public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt";
+    public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty";
+    public static final String MEMORY_SWAPPINESS = "/memory.swappiness";
+    public static final String MEMORY_USE_HIERARCHY = "/memory.use_hierarchy";
+    public static final String MEMORY_OOM_CONTROL = "/memory.oom_control";
+
+    private final String dir;
+
+    public MemoryCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.memory;
+    }
+
+    public static class Stat {
+        public final long cacheSize;
+        public final long rssSize;
+        public final long mappedFileSize;
+        public final long pgpginNum;
+        public final long pgpgoutNum;
+        public final long swapSize;
+        public final long activeAnonSize;
+        public final long inactiveAnonSize;
+        public final long activeFileSize;
+        public final long inactiveFileSize;
+        public final long unevictableSize;
+        public final long hierarchicalMemoryLimitSize;
+        public final long hierarchicalMemSwapLimitSize;
+        public final long totalCacheSize;
+        public final long totalRssSize;
+        public final long totalMappedFileSize;
+        public final long totalPgpginNum;
+        public final long totalPgpgoutNum;
+        public final long totalSwapSize;
+        public final long totalActiveAnonSize;
+        public final long totalInactiveAnonSize;
+        public final long totalActiveFileSize;
+        public final long totalInactiveFileSize;
+        public final long totalUnevictableSize;
+        public final long totalHierarchicalMemoryLimitSize;
+        public final long totalHierarchicalMemSwapLimitSize;
+
+        public Stat(String output) {
+            String[] splits = output.split("\n");
+            this.cacheSize = Long.parseLong(splits[0]);
+            this.rssSize = Long.parseLong(splits[1]);
+            this.mappedFileSize = Long.parseLong(splits[2]);
+            this.pgpginNum = Long.parseLong(splits[3]);
+            this.pgpgoutNum = Long.parseLong(splits[4]);
+            this.swapSize = Long.parseLong(splits[5]);
+            this.inactiveAnonSize = Long.parseLong(splits[6]);
+            this.activeAnonSize = Long.parseLong(splits[7]);
+            this.inactiveFileSize = Long.parseLong(splits[8]);
+            this.activeFileSize = Long.parseLong(splits[9]);
+            this.unevictableSize = Long.parseLong(splits[10]);
+            this.hierarchicalMemoryLimitSize = Long.parseLong(splits[11]);
+            this.hierarchicalMemSwapLimitSize = Long.parseLong(splits[12]);
+            this.totalCacheSize = Long.parseLong(splits[13]);
+            this.totalRssSize = Long.parseLong(splits[14]);
+            this.totalMappedFileSize = Long.parseLong(splits[15]);
+            this.totalPgpginNum = Long.parseLong(splits[16]);
+            this.totalPgpgoutNum = Long.parseLong(splits[17]);
+            this.totalSwapSize = Long.parseLong(splits[18]);
+            this.totalInactiveAnonSize = Long.parseLong(splits[19]);
+            this.totalActiveAnonSize = Long.parseLong(splits[20]);
+            this.totalInactiveFileSize = Long.parseLong(splits[21]);
+            this.totalActiveFileSize = Long.parseLong(splits[22]);
+            this.totalUnevictableSize = Long.parseLong(splits[23]);
+            this.totalHierarchicalMemoryLimitSize = Long.parseLong(splits[24]);
+            this.totalHierarchicalMemSwapLimitSize = 
Long.parseLong(splits[25]);
+        }
+    }
+
+    public Stat getStat() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_STAT)).get(0);
+        Stat stat = new Stat(output);
+        return stat;
+    }
+
+    public long getPhysicalUsage() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getWithSwapUsage() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getMaxPhysicalUsage() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getMaxWithSwapUsage() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
+    }
+
+    public void setPhysicalUsageLimit(long value) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
+    }
+
+    public long getPhysicalUsageLimit() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_LIMIT_IN_BYTES)).get(0));
+    }
+
+    public void setWithSwapUsageLimit(long value) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
+    }
+
+    public long getWithSwapUsageLimit() throws IOException {
+        return 
Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+    }
+
+    public int getPhysicalFailCount() throws IOException {
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_FAILCNT)).get(0));
+    }
+
+    public int getWithSwapFailCount() throws IOException {
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_MEMSW_FAILCNT)).get(0));
+    }
+
+    public void clearForceEmpty() throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_FORCE_EMPTY), String.valueOf(0));
+    }
+
+    public void setSwappiness(int value) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_SWAPPINESS), String.valueOf(value));
+    }
+
+    public int getSwappiness() throws IOException {
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_SWAPPINESS)).get(0));
+    }
+
+    public void setUseHierarchy(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isUseHierarchy() throws IOException {
+        int output = 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_USE_HIERARCHY)).get(0));
+        return output > 0;
+    }
+
+    public void setOomControl(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isOomControl() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
MEMORY_OOM_CONTROL)).get(0);
+        output = output.split("\n")[0].split("[\\s]")[1];
+        int value = Integer.parseInt(output);
+        return value > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
new file mode 100755
index 0000000..979eaad
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+
+public class NetClsCore implements CgroupCore {
+
+    public static final String NET_CLS_CLASSID = "/net_cls.classid";
+
+    private final String dir;
+
+    public NetClsCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.net_cls;
+    }
+
+    private StringBuilder toHex(int num) {
+        String hex = num + "";
+        StringBuilder sb = new StringBuilder();
+        int l = hex.length();
+        if (l > 4) {
+            hex = hex.substring(l - 4 - 1, l);
+        }
+        for (; l < 4; l++) {
+            sb.append('0');
+        }
+        sb.append(hex);
+        return sb;
+    }
+
+    public void setClassId(int major, int minor) throws IOException {
+        StringBuilder sb = new StringBuilder("0x");
+        sb.append(toHex(major));
+        sb.append(toHex(minor));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
NET_CLS_CLASSID), sb.toString());
+    }
+
+    public Device getClassId() throws IOException {
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
NET_CLS_CLASSID)).get(0);
+        output = Integer.toHexString(Integer.parseInt(output));
+        int major = Integer.parseInt(output.substring(0, output.length() - 4));
+        int minor = Integer.parseInt(output.substring(output.length() - 4));
+        return new Device(major, minor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java 
b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
new file mode 100755
index 0000000..95c1a40
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.Constants;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetPrioCore implements CgroupCore {
+
+    public static final String NET_PRIO_PRIOIDX = "/net_prio.prioidx";
+    public static final String NET_PRIO_IFPRIOMAP = "/net_prio.ifpriomap";
+
+    private final String dir;
+
+    public NetPrioCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.net_prio;
+    }
+
+    public int getPrioId() throws IOException {
+        return 
Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
NET_PRIO_PRIOIDX)).get(0));
+    }
+
+    public void setIfPrioMap(String iface, int priority) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(iface);
+        sb.append(' ');
+        sb.append(priority);
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
NET_PRIO_IFPRIOMAP), sb.toString());
+    }
+
+    public Map<String, Integer> getIfPrioMap() throws IOException {
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        List<String> strs = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP));
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            result.put(strArgs[0], Integer.valueOf(strArgs[1]));
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index a0c0b1a..adaafb6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -52,7 +52,6 @@ import org.apache.thrift.TSerializer;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
-import org.eclipse.jetty.util.log.Log;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
@@ -1454,7 +1453,7 @@ public class Utils {
      * @return boolean whether or not the directory exists in the zip.
      */
     public static boolean zipDoesContainDir(String zipfile, String target) 
throws IOException {
-        List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new 
ZipFile(zipfile).entries());
+        List<ZipEntry> entries = (List<ZipEntry>) Collections.list(new 
ZipFile(zipfile).entries());
 
         String targetDir = target + "/";
         for(ZipEntry entry : entries) {

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj 
b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 9c31ddf..956abe8 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -22,7 +22,7 @@
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount TestAggregatesCounter TestPlannerSpout])
   (:import [org.apache.storm.scheduler ISupervisor])
   (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
-  (:import [org.apache.storm.generated RebalanceOptions])
+  (:import [org.apache.storm.generated RebalanceOptions WorkerResources])
   (:import [org.mockito Matchers Mockito])
   (:import [java.util UUID])
   (:import [java.io File])
@@ -291,7 +291,6 @@
     (let [mock-port "42"
           mock-storm-id "fake-storm-id"
           mock-worker-id "fake-worker-id"
-          mock-mem-onheap 512
           mock-cp (str Utils/FILE_PATH_SEPARATOR "base" 
Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar")
           mock-sensitivity "S3"
           mock-cp "/base:/stormjar.jar"
@@ -358,7 +357,7 @@
                                       mock-storm-id
                                       mock-port
                                       mock-worker-id
-                                      mock-mem-onheap)
+                                      (WorkerResources.))
                 (. (Mockito/verify utils-spy)
                    (launchProcessImpl (Matchers/eq exp-args)
                                       (Matchers/any)
@@ -394,7 +393,7 @@
                                             mock-storm-id
                                             mock-port
                                             mock-worker-id
-                                            mock-mem-onheap)
+                                            (WorkerResources.))
                   (. (Mockito/verify utils-spy)
                      (launchProcessImpl (Matchers/eq exp-args)
                                         (Matchers/any)
@@ -428,7 +427,7 @@
                                               mock-storm-id
                                               mock-port
                                               mock-worker-id
-                                              mock-mem-onheap)
+                                              (WorkerResources.))
                   (. (Mockito/verify utils-spy)
                      (launchProcessImpl (Matchers/eq exp-args)
                                         (Matchers/any)
@@ -462,7 +461,7 @@
                                         mock-storm-id
                                         mock-port
                                         mock-worker-id
-                                        mock-mem-onheap)
+                                        (WorkerResources.))
               (. (Mockito/verify utils-spy)
                  (launchProcessImpl (Matchers/any)
                                     (Matchers/eq full-env)
@@ -475,7 +474,6 @@
     (let [mock-port "42"
           mock-storm-id "fake-storm-id"
           mock-worker-id "fake-worker-id"
-          mock-mem-onheap 512
           mock-sensitivity "S3"
           mock-cp "mock-classpath'quote-on-purpose"
           attrs (make-array FileAttribute 0)
@@ -554,7 +552,7 @@
                                           mock-storm-id
                                           mock-port
                                           mock-worker-id
-                                          mock-mem-onheap)
+                                          (WorkerResources.))
                 (. (Mockito/verify utils-spy)
                    (launchProcessImpl (Matchers/eq exp-launch)
                                       (Matchers/any)
@@ -596,7 +594,7 @@
                                           mock-storm-id
                                           mock-port
                                           mock-worker-id
-                                          mock-mem-onheap)
+                                          (WorkerResources.))
                 (. (Mockito/verify utils-spy)
                  (launchProcessImpl (Matchers/eq exp-launch)
                                     (Matchers/any)

http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/test/jvm/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestCgroups.java 
b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
new file mode 100644
index 0000000..f19ffc2
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Unit tests for CGroups
+ */
+public class TestCgroups {
+
+    /**
+     * Test whether cgroups are setup up correctly for use.  Also tests 
whether Cgroups produces the right command to
+     * start a worker and cleans up correctly after the worker is shutdown
+     */
+    @Test
+    public void testSetupAndTearDown() throws IOException {
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        //We don't want to run the test is CGroups are not setup
+        Assume.assumeTrue("Check if CGroups are setup", ((boolean) 
config.get(Config.STORM_CGROUP_ENABLE)) == true);
+
+        Assert.assertTrue("Check if STORM_CGROUP_HIERARCHY_DIR exists", 
stormCgroupHierarchyExists(config));
+        Assert.assertTrue("Check if STORM_SUPERVISOR_CGROUP_ROOTDIR exists", 
stormCgroupSupervisorRootDirExists(config));
+
+        CgroupManager manager = new CgroupManager();
+        manager.prepare(config);
+
+        Map<String, Object> resourcesMap = new HashMap<String, Object>();
+        resourcesMap.put("cpu", 200);
+        resourcesMap.put("memory", 1024);
+        String workerId = UUID.randomUUID().toString();
+        String command = manager.startNewWorker(workerId, resourcesMap);
+
+        String correctCommand1 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + 
" -g memory,cpu:/"
+                + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + 
workerId;
+        String correctCommand2 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + 
" -g cpu,memory:/"
+                + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + 
workerId;
+        Assert.assertTrue("Check if cgroup launch command is correct", 
command.equals(correctCommand1) || command.equals(correctCommand2));
+
+        String pathToWorkerCgroupDir = ((String) 
config.get(Config.STORM_CGROUP_HIERARCHY_DIR))
+                + "/" + ((String) 
config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR)) + "/" + workerId;
+
+        Assert.assertTrue("Check if cgroup directory exists for worker", 
dirExists(pathToWorkerCgroupDir));
+
+        /* validate cpu settings */
+
+        String pathToCpuShares = pathToWorkerCgroupDir + "/cpu.shares";
+        Assert.assertTrue("Check if cpu.shares file exists", 
fileExists(pathToCpuShares));
+        Assert.assertEquals("Check if the correct value is written into 
cpu.shares", "200", readFileAll(pathToCpuShares));
+
+        /* validate memory settings */
+
+        String pathTomemoryLimitInBytes = pathToWorkerCgroupDir + 
"/memory.limit_in_bytes";
+
+        Assert.assertTrue("Check if memory.limit_in_bytes file exists", 
fileExists(pathTomemoryLimitInBytes));
+        Assert.assertEquals("Check if the correct value is written into 
memory.limit_in_bytes", String.valueOf(1024 * 1024 * 1024), 
readFileAll(pathTomemoryLimitInBytes));
+
+        manager.shutDownWorker(workerId, true);
+
+        Assert.assertFalse("Make sure cgroup was removed properly", 
dirExists(pathToWorkerCgroupDir));
+    }
+
+    private boolean stormCgroupHierarchyExists(Map config) {
+        String pathToStormCgroupHierarchy = (String) 
config.get(Config.STORM_CGROUP_HIERARCHY_DIR);
+        return dirExists(pathToStormCgroupHierarchy);
+    }
+
+    private boolean stormCgroupSupervisorRootDirExists(Map config) {
+        String pathTostormCgroupSupervisorRootDir = ((String) 
config.get(Config.STORM_CGROUP_HIERARCHY_DIR))
+                + "/" + ((String) 
config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR));
+
+        return dirExists(pathTostormCgroupSupervisorRootDir);
+    }
+
+    private boolean dirExists(String rawPath) {
+        File path = new File(rawPath);
+        return path.exists() && path.isDirectory();
+    }
+
+    private boolean fileExists(String rawPath) {
+        File path = new File(rawPath);
+        return path.exists() && !path.isDirectory();
+    }
+
+    private String readFileAll(String filePath) throws IOException {
+        byte[] data = Files.readAllBytes(Paths.get(filePath));
+        return new String(data).trim();
+    }
+}

Reply via email to