[STORM-1336] - Evalute/Port JStorm cgroup support
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc063ecc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc063ecc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc063ecc Branch: refs/heads/master Commit: fc063ecc3e1fb6a3d50b9cf27c756db0baf2e6c3 Parents: 9ddd29f Author: Boyang Jerry Peng <[email protected]> Authored: Fri Feb 12 10:38:34 2016 -0600 Committer: Boyang Jerry Peng <[email protected]> Committed: Fri Feb 12 10:38:34 2016 -0600 ---------------------------------------------------------------------- conf/cgconfig.conf.example | 41 +++ conf/defaults.yaml | 13 + .../clj/org/apache/storm/daemon/supervisor.clj | 50 +++- storm-core/src/jvm/org/apache/storm/Config.java | 78 ++++++ .../container/ResourceIsolationInterface.java | 43 +++ .../storm/container/cgroup/CgroupCenter.java | 232 +++++++++++++++++ .../storm/container/cgroup/CgroupCommon.java | 226 ++++++++++++++++ .../container/cgroup/CgroupCommonOperation.java | 82 ++++++ .../container/cgroup/CgroupCoreFactory.java | 75 ++++++ .../storm/container/cgroup/CgroupManager.java | 177 +++++++++++++ .../storm/container/cgroup/CgroupOperation.java | 46 ++++ .../storm/container/cgroup/CgroupUtils.java | 133 ++++++++++ .../storm/container/cgroup/Constants.java | 30 +++ .../apache/storm/container/cgroup/Device.java | 72 ++++++ .../storm/container/cgroup/Hierarchy.java | 117 +++++++++ .../storm/container/cgroup/SubSystem.java | 78 ++++++ .../storm/container/cgroup/SubSystemType.java | 58 +++++ .../storm/container/cgroup/SystemOperation.java | 65 +++++ .../storm/container/cgroup/core/BlkioCore.java | 259 +++++++++++++++++++ .../storm/container/cgroup/core/CgroupCore.java | 26 ++ .../storm/container/cgroup/core/CpuCore.java | 136 ++++++++++ .../container/cgroup/core/CpuacctCore.java | 72 ++++++ .../storm/container/cgroup/core/CpusetCore.java | 212 +++++++++++++++ .../container/cgroup/core/DevicesCore.java | 186 +++++++++++++ .../container/cgroup/core/FreezerCore.java | 67 +++++ .../storm/container/cgroup/core/MemoryCore.java | 189 ++++++++++++++ .../storm/container/cgroup/core/NetClsCore.java | 70 +++++ .../container/cgroup/core/NetPrioCore.java | 66 +++++ .../src/jvm/org/apache/storm/utils/Utils.java | 3 +- .../clj/org/apache/storm/supervisor_test.clj | 16 +- .../test/jvm/org/apache/storm/TestCgroups.java | 118 +++++++++ 31 files changed, 3017 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/conf/cgconfig.conf.example ---------------------------------------------------------------------- diff --git a/conf/cgconfig.conf.example b/conf/cgconfig.conf.example new file mode 100644 index 0000000..555b83a --- /dev/null +++ b/conf/cgconfig.conf.example @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mount { + cpuset = /cgroup/cpuset; + cpu = /cgroup/storm_resources; + cpuacct = /cgroup/cpuacct; + memory = /cgroup/storm_resources; + devices = /cgroup/devices; + freezer = /cgroup/freezer; + net_cls = /cgroup/net_cls; + blkio = /cgroup/blkio; +} + +group storm { + perm { + task { + uid = 500; + gid = 500; + } + admin { + uid = 500; + gid = 500; + } + } + cpu { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index d381f0d..e32e6f7 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -285,3 +285,16 @@ pacemaker.kerberos.users: [] #default storm daemon metrics reporter plugins storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" + +storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager" + +# Configs for CGroup support +storm.cgroup.hierarchy.dir: "/cgroup/storm_resources" +storm.cgroup.resources: + - cpu + - memory +storm.cgroup.hierarchy.name: "storm" +# Also determines whether the unit tests for cgroup runs. If cgroup.enable is set to false the unit tests for cgroups will not run +storm.cgroup.enable: false +storm.supervisor.cgroup.rootdir: "storm" +storm.cgroup.cgexec.cmd: "/bin/cgexec" http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index ae9e92f..97f2825 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -43,7 +43,9 @@ (:require [metrics.gauges :refer [defgauge]]) (:require [metrics.meters :refer [defmeter mark!]]) (:gen-class - :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]])) + :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]]) + (:import [org.apache.storm.container.cgroup CgroupManager]) + (:require [clojure.string :as str])) (defmeter supervisor:num-workers-launched) @@ -307,7 +309,9 @@ (log-debug "Removing path " path) (.delete (File. path)) (catch Exception e))))) ;; on windows, the supervisor may still holds the lock on the worker directory - (try-cleanup-worker conf id)) + (try-cleanup-worker conf id) + (if (conf STORM-CGROUP-ENABLE) + (.shutDownWorker (:cgroup-manager supervisor) id false))) (log-message "Shut down " (:supervisor-id supervisor) ":" id)) (def SUPERVISOR-ZK-ACLS @@ -350,6 +354,12 @@ :sync-retry (atom 0) :download-lock (Object.) :stormid->profiler-actions (atom {}) + :cgroup-manager (if (conf STORM-CGROUP-ENABLE) + (let [cgroup-manager (.newInstance (Class/forName (conf STORM-RESOURCE-ISOLATION-PLUGIN)))] + (.prepare cgroup-manager conf) + (log-message "Using resource isolation plugin " (conf STORM-RESOURCE-ISOLATION-PLUGIN)) + cgroup-manager) + nil) }) (defn required-topo-files-exist? @@ -388,7 +398,7 @@ (:storm-id assignment) port id - mem-onheap) + resources) [id port]) (do (log-message "Missing topology storm code, so can't launch worker with assignment " @@ -1054,7 +1064,7 @@ (Utils/createSymlink worker-dir topo-dir "artifacts" (str port))))) (defmethod launch-worker - :distributed [supervisor storm-id port worker-id mem-onheap] + :distributed [supervisor storm-id port worker-id resources] (let [conf (:conf supervisor) run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) storm-home (System/getProperty "storm.home") @@ -1078,9 +1088,15 @@ (Utils/addToClasspath [stormjar]) (Utils/addToClasspath topo-classpath)) top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS) - mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero - (int (Math/ceil mem-onheap)) ;; round up + mem-onheap (if (and (.get_mem_on_heap resources) (> (.get_mem_on_heap resources) 0)) ;; not nil and not zero + (int (Math/ceil (.get_mem_on_heap resources))) ;; round up (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value + mem-offheap (if (.get_mem_off_heap resources) + (int (Math/ceil (.get_mem_off_heap resources))) ;; round up + 0) + + cpu (int (Math/ceil (.get_cpu resources))) + gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap) topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS) user (storm-conf TOPOLOGY-SUBMITTER-USER) @@ -1104,8 +1120,26 @@ (str "file:///" storm-log4j2-conf-dir)) storm-log4j2-conf-dir) Utils/FILE_PATH_SEPARATOR "worker.xml") + + cgroup-command (if (conf STORM-CGROUP-ENABLE) + (str/split + (.startNewWorker (:cgroup-manager supervisor) worker-id + (merge + ;; The manually set CGROUP-WORKER-CPU-LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + (cond + (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT) {"memory" (conf STORM-WORKER-CGROUP-MEMORY-MB-LIMIT)} + (+ mem-onheap mem-offheap) {"memory" (+ mem-onheap mem-offheap)} + :else nil) + ;; The manually set CGROUP-WORKER-CPU-LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) + (cond + (conf STORM-WORKER-CGROUP-CPU-LIMIT) {"cpu" (conf STORM-WORKER-CGROUP-CPU-LIMIT)} + (not= cpu nil) {"cpu" cpu} + :else nil))) #" ")) + command (concat - [(java-cmd) "-cp" classpath + (if (conf STORM-CGROUP-ENABLE) + cgroup-command) + [(java-cmd) "-cp" classpath topo-worker-logwriter-childopts (str "-Dlogfile.name=" logfilename) (str "-Dstorm.home=" storm-home) @@ -1200,7 +1234,7 @@ (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))))))) (defmethod launch-worker - :local [supervisor storm-id port worker-id mem-onheap] + :local [supervisor storm-id port worker-id resources] (let [conf (:conf supervisor) pid (Utils/uuid) worker (worker/mk-worker conf http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 74231a0..a5c1ea0 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -17,6 +17,7 @@ */ package org.apache.storm; +import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -2194,6 +2195,63 @@ public class Config extends HashMap<String, Object> { @isString public static final Object CLIENT_JAR_TRANSFORMER = "client.jartransformer.class"; + + @isImplementationOfClass(implementsClass = ResourceIsolationInterface.class) + public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; + + /** + * CGroup Setting below + */ + + /** + * root directory of the storm cgroup hierarchy + */ + @isString + public static final Object STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir"; + + /** + * resources to to be controlled by cgroups + */ + @isStringList + public static final Object STORM_CGROUP_RESOURCES = "storm.cgroup.resources"; + + /** + * name for the cgroup hierarchy + */ + @isString + public static final Object STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name"; + + /** + * flag to determine whether to use cgroups + */ + @isBoolean + public static final String STORM_CGROUP_ENABLE = "storm.cgroup.enable"; + + /** + * root directory for cgoups + */ + @isString + public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = "storm.supervisor.cgroup.rootdir"; + + /** + * the manually set memory limit (in MB) for each CGroup on supervisor node + */ + @isPositiveNumber + public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = "storm.worker.cgroup.memory.mb.limit"; + + /** + * the manually set cpu share for each CGroup on supervisor node + */ + @isPositiveNumber + public static String STORM_WORKER_CGROUP_CPU_LIMIT = "storm.worker.cgroup.cpu.limit"; + + /** + * full path to cgexec command + */ + @isString + public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd"; + + public static void setClasspath(Map conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } @@ -2406,4 +2464,24 @@ public class Config extends HashMap<String, Object> { this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName()); } } + + public static String getCgroupRootDir(Map conf) { + return (String) conf.get(STORM_SUPERVISOR_CGROUP_ROOTDIR); + } + + public static String getCgroupStormHierarchyDir(Map conf) { + return (String) conf.get(Config.STORM_CGROUP_HIERARCHY_DIR); + } + + public static ArrayList<String> getCgroupStormResources(Map conf) { + ArrayList<String> ret = new ArrayList<String>(); + for (String entry : ((Iterable<String>) conf.get(Config.STORM_CGROUP_RESOURCES))) { + ret.add(entry); + } + return ret; + } + + public static String getCgroupStormHierarchyName(Map conf) { + return (String) conf.get(Config.STORM_CGROUP_HIERARCHY_NAME); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java new file mode 100644 index 0000000..8e52bc7 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.container; + +import java.util.Map; + +/** + * A plugin to support resource isolation and limitation within Storm + */ +public interface ResourceIsolationInterface { + + /** + * @param workerId worker id of the worker to start + * @param resources set of resources to limit + * @return a String that includes to command on how to start the worker. The string returned from this function + * will be concatenated to the front of the command to launch logwriter/worker in supervisor.clj + */ + public String startNewWorker(String workerId, Map resources); + + /** + * This function will be called when the worker needs to shutdown. This function should include logic to clean up after a worker is shutdown + * @param workerId worker id to shutdown and clean up after + * @param isKilled whether to actually kill worker + */ + public void shutDownWorker(String workerId, boolean isKilled); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java new file mode 100644 index 0000000..f7e7f69 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CgroupCenter implements CgroupOperation { + + private static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class); + + private static CgroupCenter instance; + + private CgroupCenter() { + + } + + /** + * Thread unsafe + * + * @return + */ + public synchronized static CgroupCenter getInstance() { + if (instance == null) { + instance = new CgroupCenter(); + } + return CgroupUtils.enabled() ? instance : null; + } + + @Override + public List<Hierarchy> getHierarchies() { + + Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>(); + + try (FileReader reader = new FileReader(Constants.MOUNT_STATUS_FILE); + BufferedReader br = new BufferedReader(reader)) { + String str = null; + while ((str = br.readLine()) != null) { + String[] strSplit = str.split(" "); + if (!strSplit[2].equals("cgroup")) { + continue; + } + String name = strSplit[0]; + String type = strSplit[3]; + String dir = strSplit[1]; + Hierarchy h = hierarchies.get(type); + h = new Hierarchy(name, CgroupUtils.analyse(type), dir); + hierarchies.put(type, h); + } + return new ArrayList<Hierarchy>(hierarchies.values()); + } catch (Exception e) { + LOG.error("Get hierarchies error {}", e); + } + return null; + } + + @Override + public Set<SubSystem> getSubSystems() { + + Set<SubSystem> subSystems = new HashSet<SubSystem>(); + + try (FileReader reader = new FileReader(Constants.CGROUP_STATUS_FILE); + BufferedReader br = new BufferedReader(reader)){ + String str = null; + while ((str = br.readLine()) != null) { + String[] split = str.split("\t"); + SubSystemType type = SubSystemType.getSubSystem(split[0]); + if (type == null) { + continue; + } + subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]) + , Integer.valueOf(split[3]).intValue() == 1 ? true : false)); + } + return subSystems; + } catch (Exception e) { + LOG.error("Get subSystems error {}", e); + } + return null; + } + + @Override + public boolean enabled(SubSystemType subsystem) { + + Set<SubSystem> subSystems = this.getSubSystems(); + for (SubSystem subSystem : subSystems) { + if (subSystem.getType() == subsystem) { + return true; + } + } + return false; + } + + @Override + public Hierarchy busy(SubSystemType subsystem) { + List<Hierarchy> hierarchies = this.getHierarchies(); + for (Hierarchy hierarchy : hierarchies) { + for (SubSystemType type : hierarchy.getSubSystems()) { + if (type == subsystem) { + return hierarchy; + } + } + } + return null; + } + + @Override + public Hierarchy busy(List<SubSystemType> subSystems) { + List<Hierarchy> hierarchies = this.getHierarchies(); + for (Hierarchy hierarchy : hierarchies) { + Hierarchy ret = hierarchy; + for (SubSystemType subsystem : subSystems) { + if (!hierarchy.getSubSystems().contains(subsystem)) { + ret = null; + break; + } + } + if (ret != null) { + return ret; + } + } + return null; + } + + @Override + public Hierarchy mounted(Hierarchy hierarchy) { + + List<Hierarchy> hierarchies = this.getHierarchies(); + if (CgroupUtils.dirExists(hierarchy.getDir())) { + for (Hierarchy h : hierarchies) { + if (h.equals(hierarchy)) { + return h; + } + } + } + return null; + } + + @Override + public void mount(Hierarchy hierarchy) throws IOException { + + if (this.mounted(hierarchy) != null) { + LOG.error("{} is mounted", hierarchy.getDir()); + return; + } + Set<SubSystemType> subsystems = hierarchy.getSubSystems(); + for (SubSystemType type : subsystems) { + if (this.busy(type) != null) { + LOG.error("subsystem: {} is busy", type.name()); + subsystems.remove(type); + } + } + if (subsystems.size() == 0) { + return; + } + if (!CgroupUtils.dirExists(hierarchy.getDir())) { + new File(hierarchy.getDir()).mkdirs(); + } + String subSystems = CgroupUtils.reAnalyse(subsystems); + SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); + + } + + @Override + public void umount(Hierarchy hierarchy) throws IOException { + if (this.mounted(hierarchy) != null) { + hierarchy.getRootCgroups().delete(); + SystemOperation.umount(hierarchy.getDir()); + CgroupUtils.deleteDir(hierarchy.getDir()); + } + } + + @Override + public void create(CgroupCommon cgroup) throws SecurityException { + if (cgroup.isRoot()) { + LOG.error("You can't create rootCgroup in this function"); + return; + } + CgroupCommon parent = cgroup.getParent(); + while (parent != null) { + if (!CgroupUtils.dirExists(parent.getDir())) { + LOG.error(" {} is not existed", parent.getDir()); + return; + } + parent = parent.getParent(); + } + Hierarchy h = cgroup.getHierarchy(); + if (mounted(h) == null) { + LOG.error("{} is not mounted", h.getDir()); + return; + } + if (CgroupUtils.dirExists(cgroup.getDir())) { + LOG.error("{} is existed", cgroup.getDir()); + return; + } + + //Todo perhaps thrown exception or print out error message is dir is not created successfully + if (!(new File(cgroup.getDir())).mkdir()) { + LOG.error("Could not create cgroup dir at {}", cgroup.getDir()); + } + } + + @Override + public void delete(CgroupCommon cgroup) throws IOException { + cgroup.delete(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java new file mode 100755 index 0000000..fbf96ba --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import org.apache.storm.container.cgroup.core.CgroupCore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CgroupCommon implements CgroupCommonOperation { + + public static final String TASKS = "/tasks"; + public static final String NOTIFY_ON_RELEASE = "/notify_on_release"; + public static final String RELEASE_AGENT = "/release_agent"; + public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children"; + public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control"; + public static final String CGROUP_PROCS = "/cgroup.procs"; + + private final Hierarchy hierarchy; + + private final String name; + + private final String dir; + + private final CgroupCommon parent; + + private final Map<SubSystemType, CgroupCore> cores; + + private final boolean isRoot; + + private final Set<CgroupCommon> children = new HashSet<CgroupCommon>(); + + private static final Logger LOG = LoggerFactory.getLogger(CgroupCommon.class); + + public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) { + this.name = parent.getName() + "/" + name; + this.hierarchy = hierarchy; + this.parent = parent; + this.dir = parent.getDir() + "/" + name; + this.init(); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); + this.isRoot = false; + } + + /** + * rootCgroup + */ + public CgroupCommon(Hierarchy hierarchy, String dir) { + this.name = ""; + this.hierarchy = hierarchy; + this.parent = null; + this.dir = dir; + this.init(); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir); + this.isRoot = true; + } + + @Override + public void addTask(int taskId) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId)); + } + + @Override + public Set<Integer> getTasks() throws IOException { + List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); + Set<Integer> tasks = new HashSet<Integer>(); + for (String task : stringTasks) { + tasks.add(Integer.valueOf(task)); + } + return tasks; + } + + @Override + public void addProcs(int pid) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); + } + + @Override + public Set<Integer> getPids() throws IOException { + List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS)); + Set<Integer> pids = new HashSet<Integer>(); + for (String task : stringPids) { + pids.add(Integer.valueOf(task)); + } + return pids; + } + + @Override + public void setNotifyOnRelease(boolean flag) throws IOException { + + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); + } + + @Override + public boolean getNotifyOnRelease() throws IOException { + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; + } + + @Override + public void setReleaseAgent(String command) throws IOException { + if (!this.isRoot) { + return; + } + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command); + } + + @Override + public String getReleaseAgent() throws IOException { + if (!this.isRoot) { + return null; + } + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0); + } + + @Override + public void setCgroupCloneChildren(boolean flag) throws IOException { + if (!this.cores.keySet().contains(SubSystemType.cpuset)) { + return; + } + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); + } + + @Override + public boolean getCgroupCloneChildren() throws IOException { + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; + } + + @Override + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append(eventFd); + sb.append(' '); + sb.append(controlFd); + for (String arg : args) { + sb.append(' '); + sb.append(arg); + } + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); + } + + public Hierarchy getHierarchy() { + return hierarchy; + } + + public String getName() { + return name; + } + + public String getDir() { + return dir; + } + + public CgroupCommon getParent() { + return parent; + } + + public Set<CgroupCommon> getChildren() { + return children; + } + + public boolean isRoot() { + return isRoot; + } + + public Map<SubSystemType, CgroupCore> getCores() { + return cores; + } + + public void delete() throws IOException { + this.free(); + if (!this.isRoot) { + this.parent.getChildren().remove(this); + } + } + + private void free() throws IOException { + for (CgroupCommon child : this.children) { + child.free(); + } + if (this.isRoot) { + return; + } + Set<Integer> tasks = this.getTasks(); + if (tasks != null) { + for (Integer task : tasks) { + this.parent.addTask(task); + } + } + CgroupUtils.deleteDir(this.dir); + } + + private void init() { + File file = new File(this.dir); + File[] files = file.listFiles(); + if (files == null) { + return; + } + for (File child : files) { + if (child.isDirectory()) { + this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java new file mode 100755 index 0000000..f6b4ece --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import java.io.IOException; +import java.util.Set; + +public interface CgroupCommonOperation { + + /** + * add task to cgroup + * @param taskid task id of task to add + */ + public void addTask(int taskid) throws IOException; + + /** + * Get a list of task ids running in CGroup + */ + public Set<Integer> getTasks() throws IOException; + + /** + * add a process to cgroup + * @param pid the PID of the process to add + */ + public void addProcs(int pid) throws IOException; + + /** + * get the PIDs of processes running in cgroup + */ + public Set<Integer> getPids() throws IOException; + + /** + * to set notify_on_release config in cgroup + */ + public void setNotifyOnRelease(boolean flag) throws IOException; + + /** + * to get the notify_on_release config + */ + public boolean getNotifyOnRelease() throws IOException; + + /** + * set a command for the release agent to execute + */ + public void setReleaseAgent(String command) throws IOException; + + /** + * get the command for the relase agent to execute + */ + public String getReleaseAgent() throws IOException; + + /** + * Set the cgroup.clone_children config + */ + public void setCgroupCloneChildren(boolean flag) throws IOException; + + /** + * get the cgroup.clone_children config + */ + public boolean getCgroupCloneChildren() throws IOException; + + /** + * set event control config + */ + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java new file mode 100755 index 0000000..98aedcf --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import org.apache.storm.container.cgroup.core.BlkioCore; +import org.apache.storm.container.cgroup.core.CgroupCore; +import org.apache.storm.container.cgroup.core.CpuCore; +import org.apache.storm.container.cgroup.core.CpuacctCore; +import org.apache.storm.container.cgroup.core.CpusetCore; +import org.apache.storm.container.cgroup.core.DevicesCore; +import org.apache.storm.container.cgroup.core.FreezerCore; +import org.apache.storm.container.cgroup.core.MemoryCore; +import org.apache.storm.container.cgroup.core.NetClsCore; +import org.apache.storm.container.cgroup.core.NetPrioCore; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class CgroupCoreFactory { + + public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) { + Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>(); + for (SubSystemType type : types) { + switch (type) { + case blkio: + result.put(SubSystemType.blkio, new BlkioCore(dir)); + break; + case cpuacct: + result.put(SubSystemType.cpuacct, new CpuacctCore(dir)); + break; + case cpuset: + result.put(SubSystemType.cpuset, new CpusetCore(dir)); + break; + case cpu: + result.put(SubSystemType.cpu, new CpuCore(dir)); + break; + case devices: + result.put(SubSystemType.devices, new DevicesCore(dir)); + break; + case freezer: + result.put(SubSystemType.freezer, new FreezerCore(dir)); + break; + case memory: + result.put(SubSystemType.memory, new MemoryCore(dir)); + break; + case net_cls: + result.put(SubSystemType.net_cls, new NetClsCore(dir)); + break; + case net_prio: + result.put(SubSystemType.net_prio, new NetPrioCore(dir)); + break; + default: + break; + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java new file mode 100644 index 0000000..a3dbd9d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.container.cgroup; + +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.container.cgroup.core.CpuCore; +import org.apache.storm.container.cgroup.core.MemoryCore; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CgroupManager implements ResourceIsolationInterface { + + private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class); + + private CgroupCenter center; + + private Hierarchy hierarchy; + + private CgroupCommon rootCgroup; + + private static String rootDir; + + private Map conf; + + public void prepare(Map conf) throws IOException { + this.conf = conf; + this.rootDir = Config.getCgroupRootDir(this.conf); + if (this.rootDir == null) { + throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing."); + } + + File file = new File(Config.getCgroupStormHierarchyDir(conf) + "/" + this.rootDir); + if (!file.exists()) { + LOG.error("{}/{} is not existing.", Config.getCgroupStormHierarchyDir(conf), this.rootDir); + throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file."); + } + this.center = CgroupCenter.getInstance(); + if (this.center == null) { + throw new RuntimeException("Cgroup error, please check /proc/cgroups"); + } + this.prepareSubSystem(this.conf); + } + + /** + * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. + * If making a process to fully use two cpu cores, set cfs_period_us to + * 100000 and set cfs_quota_us to 200000 + */ + private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException { + + if (cpuCoreUpperLimit == -1) { + // No control of cpu usage + cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit); + } else { + cpuCore.setCpuCfsPeriodUs(100000); + cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit * 1000); + } + } + + public String startNewWorker(String workerId, Map resourcesMap) throws SecurityException { + Number cpuNum = (Number) resourcesMap.get("cpu"); + Number totalMem = null; + if (resourcesMap.get("memory") != null) { + totalMem = (Number) resourcesMap.get("memory"); + } + + CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup); + this.center.create(workerGroup); + + if (cpuNum != null) { + CpuCore cpuCore = (CpuCore) workerGroup.getCores().get(SubSystemType.cpu); + try { + cpuCore.setCpuShares(cpuNum.intValue()); + } catch (IOException e) { + throw new RuntimeException("Cannot set cpu.shares! Exception: " + e); + } + } + + if (totalMem != null) { + MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory); + try { + memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024)); + } catch (IOException e) { + throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: " + e); + } + } + + StringBuilder sb = new StringBuilder(); + + sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g "); + + Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator(); + while(it.hasNext()) { + sb.append(it.next().toString()); + if(it.hasNext()) { + sb.append(","); + } else { + sb.append(":"); + } + } + + sb.append(workerGroup.getName()); + + return sb.toString(); + } + + public void shutDownWorker(String workerId, boolean isKilled) { + CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup); + try { + if (isKilled == false) { + for (Integer pid : workerGroup.getTasks()) { + Utils.kill(pid); + } + Utils.sleepMs(1500); + } + Set<Integer> tasks = workerGroup.getTasks(); + if (isKilled == true && !tasks.isEmpty()) { + throw new Exception("Cannot correctly showdown worker CGroup " + workerId + "tasks " + tasks.toString() + " still running!"); + } + this.center.delete(workerGroup); + } catch (Exception e) { + LOG.error("Exception thrown when shutting worker {} Exception: {}", workerId, e); + } + } + + public void close() throws IOException { + this.center.delete(this.rootCgroup); + } + + private void prepareSubSystem(Map conf) throws IOException { + List<SubSystemType> subSystemTypes = new LinkedList<>(); + for (String resource : Config.getCgroupStormResources(conf)) { + subSystemTypes.add(SubSystemType.getSubSystem(resource)); + } + + this.hierarchy = center.busy(subSystemTypes); + + if (this.hierarchy == null) { + Set<SubSystemType> types = new HashSet<SubSystemType>(); + types.add(SubSystemType.cpu); + this.hierarchy = new Hierarchy(Config.getCgroupStormHierarchyName(conf), types, Config.getCgroupStormHierarchyDir(conf)); + } + this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups()); + + // set upper limit to how much cpu can be used by all workers running on supervisor node. + // This is done so that some cpu cycles will remain free to run the daemons and other miscellaneous OS operations. + CpuCore supervisorRootCPU = (CpuCore) this.rootCgroup.getCores().get(SubSystemType.cpu); + setCpuUsageUpperLimit(supervisorRootCPU, ((Number) this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java new file mode 100755 index 0000000..aa315ba --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public interface CgroupOperation { + + public List<Hierarchy> getHierarchies(); + + public Set<SubSystem> getSubSystems(); + + public boolean enabled(SubSystemType subsystem); + + public Hierarchy busy(SubSystemType subsystem); + + public Hierarchy busy(List<SubSystemType> subSystems); + + public Hierarchy mounted(Hierarchy hierarchy); + + public void mount(Hierarchy hierarchy) throws IOException; + + public void umount(Hierarchy hierarchy) throws IOException; + + public void create(CgroupCommon cgroup) throws SecurityException; + + public void delete(CgroupCommon cgroup) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java new file mode 100644 index 0000000..7c88f5d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class CgroupUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class); + + public static void deleteDir(String dir) { + File d = new File(dir); + if (d.exists()) { + if (d.isDirectory()) { + if (!d.delete()) { + throw new RuntimeException("Cannot delete dir " + dir); + } + } else { + throw new RuntimeException("dir " + dir + " is not a directory!"); + } + } else { + LOG.warn("dir {} does not exist!", dir); + } + } + + public static boolean fileExists(String dir) { + File file = new File(dir); + return file.exists(); + } + + public static boolean dirExists(String dir) { + File file = new File(dir); + return file.isDirectory(); + } + + public static Set<SubSystemType> analyse(String str) { + Set<SubSystemType> result = new HashSet<SubSystemType>(); + String[] subSystems = str.split(","); + for (String subSystem : subSystems) { + SubSystemType type = SubSystemType.getSubSystem(subSystem); + if (type != null) { + result.add(type); + } + } + return result; + } + + public static String reAnalyse(Set<SubSystemType> subSystems) { + StringBuilder sb = new StringBuilder(); + if (subSystems.size() == 0) { + return sb.toString(); + } + for (SubSystemType type : subSystems) { + sb.append(type.name()).append(","); + } + return sb.toString().substring(0, sb.length() - 1); + } + + public static boolean enabled() { + return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE); + } + + public static List<String> readFileByLine(String fileDir) throws IOException { + List<String> result = new ArrayList<String>(); + File file = new File(fileDir); + try (FileReader fileReader = new FileReader(file); + BufferedReader reader = new BufferedReader(fileReader)) { + String tempString = null; + while ((tempString = reader.readLine()) != null) { + result.add(tempString); + } + } + return result; + } + + public static void writeFileByLine(String fileDir, List<String> strings) throws IOException { + File file = new File(fileDir); + if (!file.exists()) { + LOG.error("{} is no existed", fileDir); + return; + } + try (FileWriter writer = new FileWriter(file, true); + BufferedWriter bw = new BufferedWriter(writer)) { + for (String string : strings) { + bw.write(string); + bw.newLine(); + bw.flush(); + } + } + } + + public static void writeFileByLine(String fileDir, String string) throws IOException { + LOG.debug("For CGroups - writing {} to {} ", string, fileDir); + File file = new File(fileDir); + if (!file.exists()) { + LOG.error("{} is no existed", fileDir); + return; + } + try (FileWriter writer = new FileWriter(file, true); + BufferedWriter bw = new BufferedWriter(writer)) { + bw.write(string); + bw.newLine(); + bw.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java new file mode 100755 index 0000000..0ce9643 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Constants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +public class Constants { + + public static final String CGROUP_STATUS_FILE = "/proc/cgroups"; + + public static final String MOUNT_STATUS_FILE = "/proc/mounts"; + + public static String getDir(String dir, String constant) { + return dir + constant; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java new file mode 100755 index 0000000..26def4c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Device.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +public class Device { + + public final int major; + public final int minor; + + public Device(int major, int minor) { + this.major = major; + this.minor = minor; + } + + public Device(String str) { + String[] strArgs = str.split(":"); + this.major = Integer.valueOf(strArgs[0]); + this.minor = Integer.valueOf(strArgs[1]); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(major).append(":").append(minor); + return sb.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + major; + result = prime * result + minor; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Device other = (Device) obj; + if (major != other.major) { + return false; + } + if (minor != other.minor) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java new file mode 100755 index 0000000..16df384 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +import java.util.Set; + +public class Hierarchy { + + private final String name; + + private final Set<SubSystemType> subSystems; + + private final String type; + + private final String dir; + + private final CgroupCommon rootCgroups; + + public Hierarchy(String name, Set<SubSystemType> subSystems, String dir) { + this.name = name; + this.subSystems = subSystems; + this.dir = dir; + this.rootCgroups = new CgroupCommon(this, dir); + this.type = CgroupUtils.reAnalyse(subSystems); + } + + public Set<SubSystemType> getSubSystems() { + return subSystems; + } + + public String getType() { + return type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dir == null) ? 0 : dir.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Hierarchy other = (Hierarchy) obj; + if (dir == null) { + if (other.dir != null) { + return false; + } + } else if (!dir.equals(other.dir)) { + return false; + } + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (type == null) { + if (other.type != null) { + return false; + } + } else if (!type.equals(other.type)) { + return false; + } + return true; + } + + public String getDir() { + return dir; + } + + public CgroupCommon getRootCgroups() { + return rootCgroups; + } + + public String getName() { + return name; + } + + public boolean subSystemMounted(SubSystemType subsystem) { + for (SubSystemType type : this.subSystems) { + if (type == subsystem) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java new file mode 100755 index 0000000..ac62e61 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystem.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +public class SubSystem { + + private SubSystemType type; + + private int hierarchyID; + + private int cgroupsNum; + + private boolean enable; + + public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) { + this.type = type; + this.hierarchyID = hierarchyID; + this.cgroupsNum = cgroupNum; + this.enable = enable; + } + + public SubSystemType getType() { + return type; + } + + public void setType(SubSystemType type) { + this.type = type; + } + + public int getHierarchyID() { + return hierarchyID; + } + + public void setHierarchyID(int hierarchyID) { + this.hierarchyID = hierarchyID; + } + + public int getCgroupsNum() { + return cgroupsNum; + } + + public void setCgroupsNum(int cgroupsNum) { + this.cgroupsNum = cgroupsNum; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + @Override + public boolean equals(Object object) { + boolean ret = false; + if (object != null && object instanceof SubSystem) { + ret = (this.type.equals(((SubSystem)object).getType()) && this.hierarchyID == ((SubSystem)object).getHierarchyID()); + } + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java new file mode 100755 index 0000000..3c6c020 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup; + +public enum SubSystemType { + + // net_cls,ns is not supposted in ubuntu + blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio; + + public static SubSystemType getSubSystem(String str) { + if (str.equals("blkio")) { + return blkio; + } + else if (str.equals("cpu")) { + return cpu; + } + else if (str.equals("cpuacct")) { + return cpuacct; + } + else if (str.equals("cpuset")) { + return cpuset; + } + else if (str.equals("devices")) { + return devices; + } + else if (str.equals("freezer")) { + return freezer; + } + else if (str.equals("memory")) { + return memory; + } + else if (str.equals("perf_event")) { + return perf_event; + } + else if (str.equals("net_cls")) { + return net_cls; + } + else if (str.equals("net_prio")) { + return net_prio; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java new file mode 100644 index 0000000..ee3517a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.container.cgroup; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SystemOperation { + + private static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); + + public static boolean isRoot() throws IOException { + String result = SystemOperation.exec("echo $EUID").substring(0, 1); + return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false; + }; + + public static void mount(String name, String target, String type, String data) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); + SystemOperation.exec(sb.toString()); + } + + public static void umount(String name) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("umount ").append(name); + SystemOperation.exec(sb.toString()); + } + + public static String exec(String cmd) throws IOException { + LOG.debug("Shell cmd: {}", cmd); + Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start(); + try { + process.waitFor(); + String output = IOUtils.toString(process.getInputStream()); + String errorOutput = IOUtils.toString(process.getErrorStream()); + LOG.debug("Shell Output: {}", output); + if (errorOutput.length() != 0) { + LOG.error("Shell Error Output: {}", errorOutput); + throw new IOException(errorOutput); + } + return output; + } catch (InterruptedException ie) { + throw new IOException(ie.toString()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java new file mode 100755 index 0000000..5522601 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.CgroupUtils; +import org.apache.storm.container.cgroup.Constants; +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.Device; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BlkioCore implements CgroupCore { + + public static final String BLKIO_WEIGHT = "/blkio.weight"; + public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device"; + public static final String BLKIO_RESET_STATS = "/blkio.reset_stats"; + + public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device"; + public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device"; + public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device"; + public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device"; + + public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced"; + public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes"; + + public static final String BLKIO_TIME = "/blkio.time"; + public static final String BLKIO_SECTORS = "/blkio.sectors"; + public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced"; + public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes"; + public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time"; + public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time"; + public static final String BLKIO_IO_MERGED = "/blkio.io_merged"; + public static final String BLKIO_IO_QUEUED = "/blkio.io_queued"; + + private final String dir; + + public BlkioCore(String dir) { + this.dir = dir; + } + + @Override + public SubSystemType getType() { + return SubSystemType.blkio; + } + + /* weight: 100-1000 */ + public void setBlkioWeight(int weight) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight)); + } + + public int getBlkioWeight() throws IOException { + return Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue(); + } + + public void setBlkioWeightDevice(Device device, int weight) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight)); + } + + public Map<Device, Integer> getBlkioWeightDevice() throws IOException { + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE)); + Map<Device, Integer> result = new HashMap<Device, Integer>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Integer weight = Integer.valueOf(strArgs[1]); + result.put(device, weight); + } + return result; + } + + public void setReadBps(Device device, long bps) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps)); + } + + public Map<Device, Long> getReadBps() throws IOException { + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE)); + Map<Device, Long> result = new HashMap<Device, Long>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Long bps = Long.valueOf(strArgs[1]); + result.put(device, bps); + } + return result; + } + + public void setWriteBps(Device device, long bps) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps)); + } + + public Map<Device, Long> getWriteBps() throws IOException { + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE)); + Map<Device, Long> result = new HashMap<Device, Long>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Long bps = Long.valueOf(strArgs[1]); + result.put(device, bps); + } + return result; + } + + public void setReadIOps(Device device, long iops) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops)); + } + + public Map<Device, Long> getReadIOps() throws IOException { + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE)); + Map<Device, Long> result = new HashMap<Device, Long>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Long iops = Long.valueOf(strArgs[1]); + result.put(device, iops); + } + return result; + } + + public void setWriteIOps(Device device, long iops) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops)); + } + + public Map<Device, Long> getWriteIOps() throws IOException { + List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE)); + Map<Device, Long> result = new HashMap<Device, Long>(); + for (String string : strings) { + String[] strArgs = string.split(" "); + Device device = new Device(strArgs[0]); + Long iops = Long.valueOf(strArgs[1]); + result.put(device, iops); + } + return result; + } + + public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED))); + } + + public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES))); + } + + public Map<Device, Long> getBlkioTime() throws IOException { + Map<Device, Long> result = new HashMap<Device, Long>(); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME)); + for (String str : strs) { + String[] strArgs = str.split(" "); + result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); + } + return result; + } + + public Map<Device, Long> getBlkioSectors() throws IOException { + Map<Device, Long> result = new HashMap<Device, Long>(); + List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS)); + for (String str : strs) { + String[] strArgs = str.split(" "); + result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1])); + } + return result; + } + + public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICED))); + } + + public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_BYTES))); + } + + public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_TIME))); + } + + public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_WAIT_TIME))); + } + + public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_MERGED))); + } + + public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException { + return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_QUEUED))); + } + + public void resetStats() throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_RESET_STATS), "1"); + } + + private String makeContext(Device device, Object data) { + StringBuilder sb = new StringBuilder(); + sb.append(device.toString()).append(" ").append(data); + return sb.toString(); + } + + private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) { + Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>(); + for (String str : strs) { + String[] strArgs = str.split(" "); + if (strArgs.length != 3) { + continue; + } + Device device = new Device(strArgs[0]); + RecordType key = RecordType.getType(strArgs[1]); + Long value = Long.parseLong(strArgs[2]); + Map<RecordType, Long> record = result.get(device); + if (record == null) { + record = new HashMap<RecordType, Long>(); + result.put(device, record); + } + record.put(key, value); + } + return result; + } + + public enum RecordType { + read, write, sync, async, total; + + public static RecordType getType(String type) { + if (type.equals("Read")) { + return read; + } + else if (type.equals("Write")) { + return write; + } + else if (type.equals("Sync")) { + return sync; + } + else if (type.equals("Async")) { + return async; + } + else if (type.equals("Total")) { + return total; + } + else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fc063ecc/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java new file mode 100755 index 0000000..a6b098e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.container.cgroup.core; + +import org.apache.storm.container.cgroup.SubSystemType; + +public interface CgroupCore { + + public SubSystemType getType(); + +}
