Repository: storm Updated Branches: refs/heads/master 46999909a -> 10f9086a4
http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java new file mode 100755 index 0000000..054ec0d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; +import java.util.List; + +public class CpuCore implements CgroupCore { + + public static final String CPU_SHARES = "/cpu.shares"; + public static final String CPU_RT_RUNTIME_US = "/cpu.rt_runtime_us"; + public static final String CPU_RT_PERIOD_US = "/cpu.rt_period_us"; + public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us"; + public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us"; + public static final String CPU_STAT = "/cpu.stat"; + + private final String dir; + + public CpuCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.cpu; + } + + public void setCpuShares(int weight) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); + } + + public int getCpuShares() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0)); + } + + public void setCpuRtRuntimeUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us)); + } + + public long getCpuRtRuntimeUs() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0)); + } + + public void setCpuRtPeriodUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us)); + } + + public Long getCpuRtPeriodUs() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0)); + } + + public void setCpuCfsPeriodUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); + } + + public Long getCpuCfsPeriodUs() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0)); + } + + public void setCpuCfsQuotaUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); + } + + public Long getCpuCfsQuotaUs() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0)); + } + + public Stat getCpuStat() throws IOException { + return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_STAT))); + } + + public static class Stat { + public final int nrPeriods; + public final int nrThrottled; + public final int throttledTime; + + public Stat(List<String> statStr) { + this.nrPeriods = Integer.parseInt(statStr.get(0).split(" ")[1]); + this.nrThrottled = Integer.parseInt(statStr.get(1).split(" ")[1]); + this.throttledTime = Integer.parseInt(statStr.get(2).split(" ")[1]); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + nrPeriods; + result = prime * result + nrThrottled; + result = prime * result + throttledTime; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Stat other = (Stat) obj; + if (nrPeriods != other.nrPeriods) { + return false; + } + if (nrThrottled != other.nrThrottled) { + return false; + } + if (throttledTime != other.throttledTime) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java new file mode 100755 index 0000000..56ae2dc --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CpuacctCore implements CgroupCore { + + public static final String CPUACCT_USAGE = "/cpuacct.usage"; + public static final String CPUACCT_STAT = "/cpuacct.stat"; + public static final String CPUACCT_USAGE_PERCPU = "/cpuacct.usage_percpu"; + + private final String dir; + + public CpuacctCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.cpuacct; + } + + public Long getCpuUsage() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE)).get(0)); + } + + public Map<StatType, Long> getCpuStat() throws IOException { + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT)); + Map<StatType, Long> result = new HashMap<StatType, Long>(); + result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1])); + result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1])); + return result; + } + + public Long[] getPerCpuUsage() throws IOException { + String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0); + String[] strArgs = str.split(" "); + Long[] result = new Long[strArgs.length]; + for (int i = 0; i < result.length; i++) { + result[i] = Long.parseLong(strArgs[i]); + } + return result; + } + + public enum StatType { + user, system; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java new file mode 100755 index 0000000..fdb9996 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; +import java.util.LinkedList; + +public class CpusetCore implements CgroupCore { + + public static final String CPUSET_CPUS = "/cpuset.cpus"; + public static final String CPUSET_MEMS = "/cpuset.mems"; + public static final String CPUSET_MEMORY_MIGRATE = "/cpuset.memory_migrate"; + public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive"; + public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive"; + public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall"; + public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure"; + public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled"; + public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page"; + public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab"; + public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance"; + public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level"; + + private final String dir; + + public CpusetCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.cpuset; + } + + public void setCpus(int[] nums) throws IOException { + StringBuilder sb = new StringBuilder(); + for (int num : nums) { + sb.append(num); + sb.append(','); + } + sb.deleteCharAt(sb.length() - 1); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), sb.toString()); + } + + public int[] getCpus() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPUS)).get(0); + return parseNums(output); + } + + public void setMems(int[] nums) throws IOException { + StringBuilder sb = new StringBuilder(); + for (int num : nums) { + sb.append(num); + sb.append(','); + } + sb.deleteCharAt(sb.length() - 1); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), sb.toString()); + } + + public int[] getMems() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMS)).get(0); + return parseNums(output); + } + + public void setMemMigrate(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemMigrate() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0)); + return output > 0; + } + + public void setCpuExclusive(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0)); + } + + public boolean isCpuExclusive() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0)); + return output > 0; + } + + public void setMemExclusive(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemExclusive() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0)); + return output > 0; + } + + public void setMemHardwall(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemHardwall() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0)); + return output > 0; + } + + public int getMemPressure() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0); + return Integer.parseInt(output); + } + + public void setMemPressureEnabled(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemPressureEnabled() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0)); + return output > 0; + } + + public void setMemSpreadPage(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemSpreadPage() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0)); + return output > 0; + } + + public void setMemSpreadSlab(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0)); + } + + public boolean isMemSpreadSlab() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0)); + return output > 0; + } + + public void setSchedLoadBlance(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0)); + } + + public boolean isSchedLoadBlance() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0)); + return output > 0; + } + + public void setSchedRelaxDomainLevel(int value) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value)); + } + + public int getSchedRelaxDomainLevel() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0); + return Integer.parseInt(output); + } + + public static int[] parseNums(String outputStr) { + char[] output = outputStr.toCharArray(); + LinkedList<Integer> numList = new LinkedList<Integer>(); + int value = 0; + int start = 0; + boolean isHyphen = false; + for (char ch : output) { + if (ch == ',') { + if (isHyphen) { + for (; start <= value; start++) { + numList.add(start); + } + isHyphen = false; + } else { + numList.add(value); + } + value = 0; + } else if (ch == '-') { + isHyphen = true; + start = value; + value = 0; + } else { + value = value * 10 + (ch - '0'); + } + } + if (output[output.length - 1] != ',') { + if (isHyphen) { + for (; start <= value; start++) { + numList.add(start); + } + } else { + numList.add(value); + } + } + + int[] nums = new int[numList.size()]; + int index = 0; + for (int num : numList) { + nums[index] = num; + index++; + } + return nums; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java new file mode 100755 index 0000000..a6896c5 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.Device; + +import java.io.IOException; +import java.util.List; + +public class DevicesCore implements CgroupCore { + + private final String dir; + + public static final String DEVICES_ALLOW = "/devices.allow"; + public static final String DEVICES_DENY = "/devices.deny"; + public static final String DEVICES_LIST = "/devices.list"; + + public static final char TYPE_ALL = 'a'; + public static final char TYPE_BLOCK = 'b'; + public static final char TYPE_CHAR = 'c'; + + public static final int ACCESS_READ = 1; + public static final int ACCESS_WRITE = 2; + public static final int ACCESS_CREATE = 4; + + public static final char ACCESS_READ_CH = 'r'; + public static final char ACCESS_WRITE_CH = 'w'; + public static final char ACCESS_CREATE_CH = 'm'; + + public DevicesCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.devices; + } + + public static class Record { + Device device; + char type; + int accesses; + + public Record(char type, Device device, int accesses) { + this.type = type; + this.device = device; + this.accesses = accesses; + } + + public Record(String output) { + if (output.contains("*")) { + System.out.println("Pre:" + output); + output = output.replaceAll("\\*", "-1"); + System.out.println("After:" + output); + } + String[] splits = output.split("[: ]"); + type = splits[0].charAt(0); + int major = Integer.parseInt(splits[1]); + int minor = Integer.parseInt(splits[2]); + device = new Device(major, minor); + accesses = 0; + for (char c : splits[3].toCharArray()) { + if (c == ACCESS_READ_CH) { + accesses |= ACCESS_READ; + } + if (c == ACCESS_CREATE_CH) { + accesses |= ACCESS_CREATE; + } + if (c == ACCESS_WRITE_CH) { + accesses |= ACCESS_WRITE; + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(type); + sb.append(' '); + sb.append(device.major); + sb.append(':'); + sb.append(device.minor); + sb.append(' '); + sb.append(getAccessesFlag(accesses)); + + return sb.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + accesses; + result = prime * result + ((device == null) ? 0 : device.hashCode()); + result = prime * result + type; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Record other = (Record) obj; + if (accesses != other.accesses) { + return false; + } + if (device == null) { + if (other.device != null) { + return false; + } + } else if (!device.equals(other.device)) { + return false; + } + if (type != other.type) { + return false; + } + return true; + } + + public static Record[] parseRecordList(List<String> output) { + Record[] records = new Record[output.size()]; + for (int i = 0, l = output.size(); i < l; i++) { + records[i] = new Record(output.get(i)); + } + + return records; + } + + public static StringBuilder getAccessesFlag(int accesses) { + StringBuilder sb = new StringBuilder(); + if ((accesses & ACCESS_READ) != 0) { + sb.append(ACCESS_READ_CH); + } + if ((accesses & ACCESS_WRITE) != 0) { + sb.append(ACCESS_WRITE_CH); + } + if ((accesses & ACCESS_CREATE) != 0) { + sb.append(ACCESS_CREATE_CH); + } + return sb; + } + } + + private void setPermission(String prop, char type, Device device, int accesses) throws IOException { + Record record = new Record(type, device, accesses); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), record.toString()); + } + + public void setAllow(char type, Device device, int accesses) throws IOException { + setPermission(DEVICES_ALLOW, type, device, accesses); + } + + public void setDeny(char type, Device device, int accesses) throws IOException { + setPermission(DEVICES_DENY, type, device, accesses); + } + + public Record[] getList() throws IOException { + List<String> output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST)); + return Record.parseRecordList(output); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java new file mode 100755 index 0000000..65b8989 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; + +public class FreezerCore implements CgroupCore { + + public static final String FREEZER_STATE = "/freezer.state"; + + private final String dir; + + public FreezerCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.freezer; + } + + public void setState(State state) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase()); + } + + public State getState() throws IOException { + return State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, FREEZER_STATE)).get(0)); + } + + public enum State { + frozen, freezing, thawed; + + public static State getStateValue(String state) { + if (state.equals("FROZEN")) { + return frozen; + } + else if (state.equals("FREEZING")) { + return freezing; + } + else if (state.equals("THAWED")) { + return thawed; + } + else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java new file mode 100755 index 0000000..98be198 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; + +public class MemoryCore implements CgroupCore { + + public static final String MEMORY_STAT = "/memory.stat"; + public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes"; + public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes"; + public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes"; + public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes"; + public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes"; + public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes"; + public static final String MEMORY_FAILCNT = "/memory.failcnt"; + public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt"; + public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty"; + public static final String MEMORY_SWAPPINESS = "/memory.swappiness"; + public static final String MEMORY_USE_HIERARCHY = "/memory.use_hierarchy"; + public static final String MEMORY_OOM_CONTROL = "/memory.oom_control"; + + private final String dir; + + public MemoryCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.memory; + } + + public static class Stat { + public final long cacheSize; + public final long rssSize; + public final long mappedFileSize; + public final long pgpginNum; + public final long pgpgoutNum; + public final long swapSize; + public final long activeAnonSize; + public final long inactiveAnonSize; + public final long activeFileSize; + public final long inactiveFileSize; + public final long unevictableSize; + public final long hierarchicalMemoryLimitSize; + public final long hierarchicalMemSwapLimitSize; + public final long totalCacheSize; + public final long totalRssSize; + public final long totalMappedFileSize; + public final long totalPgpginNum; + public final long totalPgpgoutNum; + public final long totalSwapSize; + public final long totalActiveAnonSize; + public final long totalInactiveAnonSize; + public final long totalActiveFileSize; + public final long totalInactiveFileSize; + public final long totalUnevictableSize; + public final long totalHierarchicalMemoryLimitSize; + public final long totalHierarchicalMemSwapLimitSize; + + public Stat(String output) { + String[] splits = output.split("\n"); + this.cacheSize = Long.parseLong(splits[0]); + this.rssSize = Long.parseLong(splits[1]); + this.mappedFileSize = Long.parseLong(splits[2]); + this.pgpginNum = Long.parseLong(splits[3]); + this.pgpgoutNum = Long.parseLong(splits[4]); + this.swapSize = Long.parseLong(splits[5]); + this.inactiveAnonSize = Long.parseLong(splits[6]); + this.activeAnonSize = Long.parseLong(splits[7]); + this.inactiveFileSize = Long.parseLong(splits[8]); + this.activeFileSize = Long.parseLong(splits[9]); + this.unevictableSize = Long.parseLong(splits[10]); + this.hierarchicalMemoryLimitSize = Long.parseLong(splits[11]); + this.hierarchicalMemSwapLimitSize = Long.parseLong(splits[12]); + this.totalCacheSize = Long.parseLong(splits[13]); + this.totalRssSize = Long.parseLong(splits[14]); + this.totalMappedFileSize = Long.parseLong(splits[15]); + this.totalPgpginNum = Long.parseLong(splits[16]); + this.totalPgpgoutNum = Long.parseLong(splits[17]); + this.totalSwapSize = Long.parseLong(splits[18]); + this.totalInactiveAnonSize = Long.parseLong(splits[19]); + this.totalActiveAnonSize = Long.parseLong(splits[20]); + this.totalInactiveFileSize = Long.parseLong(splits[21]); + this.totalActiveFileSize = Long.parseLong(splits[22]); + this.totalUnevictableSize = Long.parseLong(splits[23]); + this.totalHierarchicalMemoryLimitSize = Long.parseLong(splits[24]); + this.totalHierarchicalMemSwapLimitSize = Long.parseLong(splits[25]); + } + } + + public Stat getStat() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_STAT)).get(0); + Stat stat = new Stat(output); + return stat; + } + + public long getPhysicalUsage() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0)); + } + + public long getWithSwapUsage() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0)); + } + + public long getMaxPhysicalUsage() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0)); + } + + public long getMaxWithSwapUsage() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0)); + } + + public void setPhysicalUsageLimit(long value) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value)); + } + + public long getPhysicalUsageLimit() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0)); + } + + public void setWithSwapUsageLimit(long value) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value)); + } + + public long getWithSwapUsageLimit() throws IOException { + return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0)); + } + + public int getPhysicalFailCount() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0)); + } + + public int getWithSwapFailCount() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0)); + } + + public void clearForceEmpty() throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0)); + } + + public void setSwappiness(int value) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value)); + } + + public int getSwappiness() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0)); + } + + public void setUseHierarchy(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0)); + } + + public boolean isUseHierarchy() throws IOException { + int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0)); + return output > 0; + } + + public void setOomControl(boolean flag) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0)); + } + + public boolean isOomControl() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0); + output = output.split("\n")[0].split("[\\s]")[1]; + int value = Integer.parseInt(output); + return value > 0; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java new file mode 100755 index 0000000..979eaad --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.Device; + +import java.io.IOException; + +public class NetClsCore implements CgroupCore { + + public static final String NET_CLS_CLASSID = "/net_cls.classid"; + + private final String dir; + + public NetClsCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.net_cls; + } + + private StringBuilder toHex(int num) { + String hex = num + ""; + StringBuilder sb = new StringBuilder(); + int l = hex.length(); + if (l > 4) { + hex = hex.substring(l - 4 - 1, l); + } + for (; l < 4; l++) { + sb.append('0'); + } + sb.append(hex); + return sb; + } + + public void setClassId(int major, int minor) throws IOException { + StringBuilder sb = new StringBuilder("0x"); + sb.append(toHex(major)); + sb.append(toHex(minor)); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString()); + } + + public Device getClassId() throws IOException { + String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0); + output = Integer.toHexString(Integer.parseInt(output)); + int major = Integer.parseInt(output.substring(0, output.length() - 4)); + int minor = Integer.parseInt(output.substring(output.length() - 4)); + return new Device(major, minor); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java new file mode 100755 index 0000000..95c1a40 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NetPrioCore implements CgroupCore { + + public static final String NET_PRIO_PRIOIDX = "/net_prio.prioidx"; + public static final String NET_PRIO_IFPRIOMAP = "/net_prio.ifpriomap"; + + private final String dir; + + public NetPrioCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.net_prio; + } + + public int getPrioId() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0)); + } + + public void setIfPrioMap(String iface, int priority) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append(iface); + sb.append(' '); + sb.append(priority); + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString()); + } + + public Map<String, Integer> getIfPrioMap() throws IOException { + Map<String, Integer> result = new HashMap<String, Integer>(); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP)); + for (String str : strs) { + String[] strArgs = str.split(" "); + result.put(strArgs[0], Integer.valueOf(strArgs[1])); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index a0c0b1a..adaafb6 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -52,7 +52,6 @@ import org.apache.thrift.TSerializer; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; -import org.eclipse.jetty.util.log.Log; import org.json.simple.JSONValue; import org.json.simple.parser.ParseException; import org.slf4j.Logger; @@ -1454,7 +1453,7 @@ public class Utils { * @return boolean whether or not the directory exists in the zip. */ public static boolean zipDoesContainDir(String zipfile, String target) throws IOException { - List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new ZipFile(zipfile).entries()); + List<ZipEntry> entries = (List<ZipEntry>) Collections.list(new ZipFile(zipfile).entries()); String targetDir = target + "/"; for(ZipEntry entry : entries) { http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj index 9c31ddf..956abe8 100644 --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@ -22,7 +22,7 @@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]) (:import [org.apache.storm.scheduler ISupervisor]) (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils]) - (:import [org.apache.storm.generated RebalanceOptions]) + (:import [org.apache.storm.generated RebalanceOptions WorkerResources]) (:import [org.mockito Matchers Mockito]) (:import [java.util UUID]) (:import [java.io File]) @@ -291,7 +291,6 @@ (let [mock-port "42" mock-storm-id "fake-storm-id" mock-worker-id "fake-worker-id" - mock-mem-onheap 512 mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar") mock-sensitivity "S3" mock-cp "/base:/stormjar.jar" @@ -358,7 +357,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/eq exp-args) (Matchers/any) @@ -394,7 +393,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/eq exp-args) (Matchers/any) @@ -428,7 +427,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/eq exp-args) (Matchers/any) @@ -462,7 +461,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/any) (Matchers/eq full-env) @@ -475,7 +474,6 @@ (let [mock-port "42" mock-storm-id "fake-storm-id" mock-worker-id "fake-worker-id" - mock-mem-onheap 512 mock-sensitivity "S3" mock-cp "mock-classpath'quote-on-purpose" attrs (make-array FileAttribute 0) @@ -554,7 +552,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/eq exp-launch) (Matchers/any) @@ -596,7 +594,7 @@ mock-storm-id mock-port mock-worker-id - mock-mem-onheap) + (WorkerResources.)) (. (Mockito/verify utils-spy) (launchProcessImpl (Matchers/eq exp-launch) (Matchers/any) http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/test/jvm/org/apache/storm/TestCgroups.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/TestCgroups.java b/storm-core/test/jvm/org/apache/storm/TestCgroups.java new file mode 100644 index 0000000..f19ffc2 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/TestCgroups.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.junit.Assert; +import org.junit.Assume; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * Unit tests for CGroups + */ +public class TestCgroups { + + /** + * Test whether cgroups are setup up correctly for use. Also tests whether Cgroups produces the right command to + * start a worker and cleans up correctly after the worker is shutdown + */ + @Test + public void testSetupAndTearDown() throws IOException { + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + //We don't want to run the test is CGroups are not setup + Assume.assumeTrue("Check if CGroups are setup", ((boolean) config.get(Config.STORM_CGROUP_ENABLE)) == true); + + Assert.assertTrue("Check if STORM_CGROUP_HIERARCHY_DIR exists", stormCgroupHierarchyExists(config)); + Assert.assertTrue("Check if STORM_SUPERVISOR_CGROUP_ROOTDIR exists", stormCgroupSupervisorRootDirExists(config)); + + CgroupManager manager = new CgroupManager(); + manager.prepare(config); + + Map<String, Object> resourcesMap = new HashMap<String, Object>(); + resourcesMap.put("cpu", 200); + resourcesMap.put("memory", 1024); + String workerId = UUID.randomUUID().toString(); + String command = manager.startNewWorker(workerId, resourcesMap); + + String correctCommand1 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + " -g memory,cpu:/" + + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId; + String correctCommand2 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + " -g cpu,memory:/" + + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId; + Assert.assertTrue("Check if cgroup launch command is correct", command.equals(correctCommand1) || command.equals(correctCommand2)); + + String pathToWorkerCgroupDir = ((String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR)) + + "/" + ((String) config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR)) + "/" + workerId; + + Assert.assertTrue("Check if cgroup directory exists for worker", dirExists(pathToWorkerCgroupDir)); + + /* validate cpu settings */ + + String pathToCpuShares = pathToWorkerCgroupDir + "/cpu.shares"; + Assert.assertTrue("Check if cpu.shares file exists", fileExists(pathToCpuShares)); + Assert.assertEquals("Check if the correct value is written into cpu.shares", "200", readFileAll(pathToCpuShares)); + + /* validate memory settings */ + + String pathTomemoryLimitInBytes = pathToWorkerCgroupDir + "/memory.limit_in_bytes"; + + Assert.assertTrue("Check if memory.limit_in_bytes file exists", fileExists(pathTomemoryLimitInBytes)); + Assert.assertEquals("Check if the correct value is written into memory.limit_in_bytes", String.valueOf(1024 * 1024 * 1024), readFileAll(pathTomemoryLimitInBytes)); + + manager.shutDownWorker(workerId, true); + + Assert.assertFalse("Make sure cgroup was removed properly", dirExists(pathToWorkerCgroupDir)); + } + + private boolean stormCgroupHierarchyExists(Map config) { + String pathToStormCgroupHierarchy = (String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR); + return dirExists(pathToStormCgroupHierarchy); + } + + private boolean stormCgroupSupervisorRootDirExists(Map config) { + String pathTostormCgroupSupervisorRootDir = ((String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR)) + + "/" + ((String) config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR)); + + return dirExists(pathTostormCgroupSupervisorRootDir); + } + + private boolean dirExists(String rawPath) { + File path = new File(rawPath); + return path.exists() && path.isDirectory(); + } + + private boolean fileExists(String rawPath) { + File path = new File(rawPath); + return path.exists() && !path.isDirectory(); + } + + private String readFileAll(String filePath) throws IOException { + byte[] data = Files.readAllBytes(Paths.get(filePath)); + return new String(data).trim(); + } +}
