http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java deleted file mode 100644 index 69ec1ed..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import java.util.Set; - -public class Hierarchy { - - private final String name; - - private final Set<ResourceType> resourceTypes; - - private final String type; - - private final String dir; - - private final CgroupCommon rootCgroups; - - public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) { - this.name = name; - this.resourceTypes = resourceTypes; - this.dir = dir; - this.rootCgroups = new CgroupCommon(this, dir); - this.type = CgroupUtils.reAnalyse(resourceTypes); - } - - public Set<ResourceType> getResourceTypes() { - return resourceTypes; - } - - public String getType() { - return type; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((dir == null) ? 0 : dir.hashCode()); - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Hierarchy other = (Hierarchy) obj; - if (dir == null) { - if (other.dir != null) - return false; - } else if (!dir.equals(other.dir)) - return false; - if (name == null) { - if (other.name != null) - return false; - } else if (!name.equals(other.name)) - return false; - if (type == null) { - if (other.type != null) - return false; - } else if (!type.equals(other.type)) - return false; - return true; - } - - public String getDir() { - return dir; - } - - public CgroupCommon getRootCgroups() { - return rootCgroups; - } - - public String getName() { - return name; - } - - public boolean subSystemMounted(ResourceType subsystem) { - for (ResourceType type : this.resourceTypes) { - if (type == subsystem) - return true; - } - return false; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java deleted file mode 100644 index c2a1d42..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -public enum ResourceType { - - // net_cls,ns is not supposted in ubuntu - blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio; - - public static ResourceType getResourceType(String str) { - if (str.equals("cpu")) - return cpu; - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java deleted file mode 100644 index 23e630c..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup.core; - -import io.gearpump.cluster.cgroup.ResourceType; - -public interface CgroupCore { - - public ResourceType getType(); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java deleted file mode 100644 index 3402d5a..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup.core; - -import io.gearpump.cluster.cgroup.CgroupUtils; -import io.gearpump.cluster.cgroup.Constants; -import io.gearpump.cluster.cgroup.ResourceType; - -import java.io.IOException; - -public class CpuCore implements CgroupCore { - - public static final String CPU_SHARES = "/cpu.shares"; - public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us"; - public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us"; - - private final String dir; - - public CpuCore(String dir) { - this.dir = dir; - } - - @Override - public ResourceType getType() { - // TODO Auto-generated method stub - return ResourceType.cpu; - } - - public void setCpuShares(int weight) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); - } - - public int getCpuShares() throws IOException { - return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0)); - } - - public void setCpuCfsPeriodUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); - } - - public void setCpuCfsQuotaUs(long us) throws IOException { - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java deleted file mode 100644 index 0772133..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.utils; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class SystemOperation { - - public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); - - public static void mount(String name, String target, String type, String data) throws IOException { - StringBuilder sb = new StringBuilder(); - sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); - SystemOperation.exec(sb.toString()); - } - - public static void umount(String name) throws IOException { - StringBuilder sb = new StringBuilder(); - sb.append("umount ").append(name); - SystemOperation.exec(sb.toString()); - } - - public static String exec(String cmd) throws IOException { - LOG.debug("Shell cmd: " + cmd); - Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start(); - try { - process.waitFor(); - String output = IOUtils.toString(process.getInputStream()); - String errorOutput = IOUtils.toString(process.getErrorStream()); - LOG.debug("Shell Output: " + output); - if (errorOutput.length() != 0) { - LOG.error("Shell Error Output: " + errorOutput); - throw new IOException(errorOutput); - } - return output; - } catch (InterruptedException ie) { - throw new IOException(ie.toString()); - } - } - - public static void main(String[] args) throws IOException { - SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java new file mode 100644 index 0000000..a616593 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +public class CGroupResource { + + private ResourceType type; + + private int hierarchyID; + + private int cgroupsNum; + + private boolean enable; + + public CGroupResource(ResourceType type, int hierarchyID, int cgroupNum, boolean enable) { + this.type = type; + this.hierarchyID = hierarchyID; + this.cgroupsNum = cgroupNum; + this.enable = enable; + } + + public ResourceType getType() { + return type; + } + + public void setType(ResourceType type) { + this.type = type; + } + + public int getHierarchyID() { + return hierarchyID; + } + + public void setHierarchyID(int hierarchyID) { + this.hierarchyID = hierarchyID; + } + + public int getCgroupsNum() { + return cgroupsNum; + } + + public void setCgroupsNum(int cgroupsNum) { + this.cgroupsNum = cgroupsNum; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java new file mode 100644 index 0000000..fb2ba65 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import org.apache.gearpump.cluster.utils.SystemOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class CgroupCenter implements CgroupOperation { + + public static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class); + + private static CgroupCenter instance; + + private CgroupCenter() { + + } + + /** + * Thread unsafe + * + * @return + */ + public synchronized static CgroupCenter getInstance() { + if (instance == null) + instance = new CgroupCenter(); + return CgroupUtils.enabled() ? instance : null; + } + + @Override + public List<Hierarchy> getHierarchies() { + // TODO Auto-generated method stub + Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>(); + FileReader reader = null; + BufferedReader br = null; + try { + reader = new FileReader(Constants.MOUNT_STATUS_FILE); + br = new BufferedReader(reader); + String str = null; + while ((str = br.readLine()) != null) { + String[] strSplit = str.split(" "); + if (!strSplit[2].equals("cgroup")) + continue; + String name = strSplit[0]; + String type = strSplit[3]; + String dir = strSplit[1]; + Hierarchy h = hierarchies.get(type); + h = new Hierarchy(name, CgroupUtils.analyse(type), dir); + hierarchies.put(type, h); + } + return new ArrayList<Hierarchy>(hierarchies.values()); + } catch (Exception e) { + LOG.error("Get hierarchies error", e); + } finally { + CgroupUtils.close(reader, br); + } + return null; + } + + @Override + public Set<CGroupResource> getCGroupResources() { + // TODO Auto-generated method stub + Set<CGroupResource> resources = new HashSet<CGroupResource>(); + FileReader reader = null; + BufferedReader br = null; + try { + reader = new FileReader(Constants.CGROUP_STATUS_FILE); + br = new BufferedReader(reader); + String str = null; + while ((str = br.readLine()) != null) { + String[] split = str.split("\t"); + ResourceType type = ResourceType.getResourceType(split[0]); + if (type == null) + continue; + resources.add(new CGroupResource(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true + : false)); + } + return resources; + } catch (Exception e) { + LOG.error("Get subSystems error ", e); + } finally { + CgroupUtils.close(reader, br); + } + return null; + } + + @Override + public boolean enabled(ResourceType resourceType) { + // TODO Auto-generated method stub + Set<CGroupResource> resources = this.getCGroupResources(); + for (CGroupResource resource : resources) { + if (resource.getType() == resourceType) + return true; + } + return false; + } + + @Override + public Hierarchy busy(ResourceType resourceType) { + List<Hierarchy> hierarchies = this.getHierarchies(); + for (Hierarchy hierarchy : hierarchies) { + for (ResourceType type : hierarchy.getResourceTypes()) { + if (type == resourceType) + return hierarchy; + } + } + return null; + } + + @Override + public Hierarchy mounted(Hierarchy hierarchy) { + // TODO Auto-generated method stub + List<Hierarchy> hierarchies = this.getHierarchies(); + if (CgroupUtils.dirExists(hierarchy.getDir())) { + for (Hierarchy h : hierarchies) { + if (h.equals(hierarchy)) + return h; + } + } + return null; + } + + @Override + public void mount(Hierarchy hierarchy) throws IOException { + // TODO Auto-generated method stub + if (this.mounted(hierarchy) != null) { + LOG.error(hierarchy.getDir() + " is mounted"); + return; + } + Set<ResourceType> resourceTypes = hierarchy.getResourceTypes(); + for (ResourceType type : resourceTypes) { + if (this.busy(type) != null) { + LOG.error("subsystem: " + type.name() + " is busy"); + resourceTypes.remove(type); + } + } + if (resourceTypes.size() == 0) + return; + if (!CgroupUtils.dirExists(hierarchy.getDir())) + new File(hierarchy.getDir()).mkdirs(); + String subSystems = CgroupUtils.reAnalyse(resourceTypes); + SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); + } + + @Override + public void umount(Hierarchy hierarchy) throws IOException { + // TODO Auto-generated method stub + if (this.mounted(hierarchy) != null) { + hierarchy.getRootCgroups().delete(); + SystemOperation.umount(hierarchy.getDir()); + CgroupUtils.deleteDir(hierarchy.getDir()); + } + } + + @Override + public void create(CgroupCommon cgroup) throws SecurityException { + // TODO Auto-generated method stub + if (cgroup.isRoot()) { + LOG.error("You can't create rootCgroup in this function"); + return; + } + CgroupCommon parent = cgroup.getParent(); + while (parent != null) { + if (!CgroupUtils.dirExists(parent.getDir())) { + LOG.error(parent.getDir() + "is not existed"); + return; + } + parent = parent.getParent(); + } + Hierarchy h = cgroup.getHierarchy(); + if (mounted(h) == null) { + LOG.error(h.getDir() + " is not mounted"); + return; + } + if (CgroupUtils.dirExists(cgroup.getDir())) { + LOG.error(cgroup.getDir() + " is existed"); + return; + } + (new File(cgroup.getDir())).mkdir(); + } + + @Override + public void delete(CgroupCommon cgroup) throws IOException { + // TODO Auto-generated method stub + cgroup.delete(); + } + + public static void main(String args[]) { + System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java new file mode 100644 index 0000000..2bccfec --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import org.apache.gearpump.cluster.cgroup.core.CgroupCore; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CgroupCommon implements CgroupCommonOperation { + + public static final String TASKS = "/tasks"; + public static final String NOTIFY_ON_RELEASE = "/notify_on_release"; + public static final String RELEASE_AGENT = "/release_agent"; + public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children"; + public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control"; + public static final String CGROUP_PROCS = "/cgroup.procs"; + + private final Hierarchy hierarchy; + + private final String name; + + private final String dir; + + private final CgroupCommon parent; + + private final Map<ResourceType, CgroupCore> cores; + + private final boolean isRoot; + + private final Set<CgroupCommon> children = new HashSet<CgroupCommon>(); + + public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) { + this.name = parent.getName() + "/" + name; + this.hierarchy = hierarchy; + this.parent = parent; + this.dir = parent.getDir() + "/" + name; + this.init(); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir); + this.isRoot = false; + } + + /** + * rootCgroup + */ + public CgroupCommon(Hierarchy hierarchy, String dir) { + this.name = ""; + this.hierarchy = hierarchy; + this.parent = null; + this.dir = dir; + this.init(); + cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir); + this.isRoot = true; + } + + @Override + public void addTask(int taskId) throws IOException { + // TODO Auto-generated method stub + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId)); + } + + @Override + public Set<Integer> getTasks() throws IOException { + List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); + Set<Integer> tasks = new HashSet<Integer>(); + for (String task : stringTasks) { + tasks.add(Integer.valueOf(task)); + } + return tasks; + } + + @Override + public void addProcs(int pid) throws IOException { + // TODO Auto-generated method stub + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); + } + + @Override + public Set<Integer> getPids() throws IOException { + // TODO Auto-generated method stub + List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS)); + Set<Integer> pids = new HashSet<Integer>(); + for (String task : stringPids) { + pids.add(Integer.valueOf(task)); + } + return pids; + } + + @Override + public void setNotifyOnRelease(boolean flag) throws IOException { + // TODO Auto-generated method stub + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); + } + + @Override + public boolean getNotifyOnRelease() throws IOException { + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; + } + + @Override + public void setReleaseAgent(String command) throws IOException { + // TODO Auto-generated method stub + if (!this.isRoot) + return; + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command); + } + + @Override + public String getReleaseAgent() throws IOException { + if (!this.isRoot) + return null; + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0); + } + + @Override + public void setCgroupCloneChildren(boolean flag) throws IOException { + // TODO Auto-generated method stub + if (!this.cores.keySet().contains(ResourceType.cpuset)) + return; + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); + } + + @Override + public boolean getCgroupCloneChildren() throws IOException { + return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; + } + + @Override + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException { + // TODO Auto-generated method stub + StringBuilder sb = new StringBuilder(); + sb.append(eventFd); + sb.append(' '); + sb.append(controlFd); + for (String arg : args) { + sb.append(' '); + sb.append(arg); + } + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); + } + + public Hierarchy getHierarchy() { + return hierarchy; + } + + public String getName() { + return name; + } + + public String getDir() { + return dir; + } + + public CgroupCommon getParent() { + return parent; + } + + public Set<CgroupCommon> getChildren() { + return children; + } + + public boolean isRoot() { + return isRoot; + } + + public Map<ResourceType, CgroupCore> getCores() { + return cores; + } + + public void delete() throws IOException { + this.free(); + if (!this.isRoot) + this.parent.getChildren().remove(this); + } + + private void free() throws IOException { + for (CgroupCommon child : this.children) + child.free(); + if (this.isRoot) + return; + Set<Integer> tasks = this.getTasks(); + if (tasks != null) { + for (Integer task : tasks) { + this.parent.addTask(task); + } + } + CgroupUtils.deleteDir(this.dir); + } + + private void init() { + File file = new File(this.dir); + File[] files = file.listFiles(); + if (files == null) + return; + for (File child : files) { + if (child.isDirectory()) { + this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java new file mode 100644 index 0000000..9ae923b --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import java.io.IOException; +import java.util.Set; + +public interface CgroupCommonOperation { + + public void addTask(int taskid) throws IOException; + + public Set<Integer> getTasks() throws IOException; + + public void addProcs(int pid) throws IOException; + + public Set<Integer> getPids() throws IOException; + + public void setNotifyOnRelease(boolean flag) throws IOException; + + public boolean getNotifyOnRelease() throws IOException; + + public void setReleaseAgent(String command) throws IOException; + + public String getReleaseAgent() throws IOException; + + public void setCgroupCloneChildren(boolean flag) throws IOException; + + public boolean getCgroupCloneChildren() throws IOException; + + public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java new file mode 100644 index 0000000..15ebcbc --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import org.apache.gearpump.cluster.cgroup.core.CgroupCore; +import org.apache.gearpump.cluster.cgroup.core.CpuCore; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class CgroupCoreFactory { + + public static Map<ResourceType, CgroupCore> getInstance(Set<ResourceType> types, String dir) { + Map<ResourceType, CgroupCore> result = new HashMap<ResourceType, CgroupCore>(); + for (ResourceType type : types) { + switch (type) { + case cpu: + result.put(ResourceType.cpu, new CpuCore(dir)); + break; + default: + break; + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java new file mode 100644 index 0000000..399f3bd --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public interface CgroupOperation { + + public List<Hierarchy> getHierarchies(); + + public Set<CGroupResource> getCGroupResources(); + + public boolean enabled(ResourceType subsystem); + + public Hierarchy busy(ResourceType subsystem); + + public Hierarchy mounted(Hierarchy hierarchy); + + public void mount(Hierarchy hierarchy) throws IOException; + + public void umount(Hierarchy hierarchy) throws IOException; + + public void create(CgroupCommon cgroup) throws SecurityException; + + public void delete(CgroupCommon cgroup) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java new file mode 100644 index 0000000..8f199d9 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import org.apache.gearpump.cluster.utils.SystemOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class CgroupUtils { + + public static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class); + + public static void deleteDir(String dir) { + try { + String cmd = "rmdir " + dir; + SystemOperation.exec(cmd); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.error("rm " + dir + " fail!", e); + } + } + + public static boolean fileExists(String dir) { + File file = new File(dir); + return file.exists(); + } + + public static boolean dirExists(String dir) { + File file = new File(dir); + return file.isDirectory(); + } + + public static Set<ResourceType> analyse(String str) { + Set<ResourceType> result = new HashSet<ResourceType>(); + String[] subSystems = str.split(","); + for (String subSystem : subSystems) { + ResourceType type = ResourceType.getResourceType(subSystem); + if (type != null) + result.add(type); + } + return result; + } + + public static String reAnalyse(Set<ResourceType> subSystems) { + StringBuilder sb = new StringBuilder(); + if (subSystems.size() == 0) + return sb.toString(); + for (ResourceType type : subSystems) { + sb.append(type.name()).append(","); + } + return sb.toString().substring(0, sb.length() - 1); + } + + public static boolean enabled() { + return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE); + } + + public static List<String> readFileByLine(String fileDir) throws IOException { + List<String> result = new ArrayList<String>(); + FileReader fileReader = null; + BufferedReader reader = null; + try { + File file = new File(fileDir); + fileReader = new FileReader(file); + reader = new BufferedReader(fileReader); + String tempString = null; + while ((tempString = reader.readLine()) != null) { + result.add(tempString); + } + } finally { + CgroupUtils.close(fileReader, reader); + } + return result; + } + + public static void writeFileByLine(String fileDir, List<String> strings) throws IOException { + FileWriter writer = null; + BufferedWriter bw = null; + try { + File file = new File(fileDir); + if (!file.exists()) { + LOG.error(fileDir + " is no existed"); + return; + } + writer = new FileWriter(file, true); + bw = new BufferedWriter(writer); + for (String string : strings) { + bw.write(string); + bw.newLine(); + bw.flush(); + } + } finally { + CgroupUtils.close(writer, bw); + } + } + + public static void writeFileByLine(String fileDir, String string) throws IOException { + FileWriter writer = null; + BufferedWriter bw = null; + try { + File file = new File(fileDir); + if (!file.exists()) { + LOG.error(fileDir + " is no existed"); + return; + } + writer = new FileWriter(file, true); + bw = new BufferedWriter(writer); + bw.write(string); + bw.newLine(); + bw.flush(); + } finally { + CgroupUtils.close(writer, bw); + } + } + + public static void close(FileReader reader, BufferedReader br) { + try { + if (reader != null) + reader.close(); + if (br != null) + br.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + + } + } + + public static void close(FileWriter writer, BufferedWriter bw) { + try { + if (writer != null) + writer.close(); + if (bw != null) + bw.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java new file mode 100644 index 0000000..fb905c7 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +public class Constants { + + public static final String CGROUP_STATUS_FILE = "/proc/cgroups"; + + public static final String MOUNT_STATUS_FILE = "/proc/mounts"; + + public static String getDir(String dir, String constant) { + return dir + constant; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java new file mode 100644 index 0000000..446802f --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +import java.util.Set; + +public class Hierarchy { + + private final String name; + + private final Set<ResourceType> resourceTypes; + + private final String type; + + private final String dir; + + private final CgroupCommon rootCgroups; + + public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) { + this.name = name; + this.resourceTypes = resourceTypes; + this.dir = dir; + this.rootCgroups = new CgroupCommon(this, dir); + this.type = CgroupUtils.reAnalyse(resourceTypes); + } + + public Set<ResourceType> getResourceTypes() { + return resourceTypes; + } + + public String getType() { + return type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dir == null) ? 0 : dir.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Hierarchy other = (Hierarchy) obj; + if (dir == null) { + if (other.dir != null) + return false; + } else if (!dir.equals(other.dir)) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (type == null) { + if (other.type != null) + return false; + } else if (!type.equals(other.type)) + return false; + return true; + } + + public String getDir() { + return dir; + } + + public CgroupCommon getRootCgroups() { + return rootCgroups; + } + + public String getName() { + return name; + } + + public boolean subSystemMounted(ResourceType subsystem) { + for (ResourceType type : this.resourceTypes) { + if (type == subsystem) + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java new file mode 100644 index 0000000..c3360e6 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup; + +public enum ResourceType { + + // net_cls,ns is not supposted in ubuntu + blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio; + + public static ResourceType getResourceType(String str) { + if (str.equals("cpu")) + return cpu; + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java new file mode 100644 index 0000000..39c0999 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup.core; + +import org.apache.gearpump.cluster.cgroup.ResourceType; + +public interface CgroupCore { + + public ResourceType getType(); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java new file mode 100644 index 0000000..4b35f7f --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.cgroup.core; + +import org.apache.gearpump.cluster.cgroup.CgroupUtils; +import org.apache.gearpump.cluster.cgroup.Constants; +import org.apache.gearpump.cluster.cgroup.ResourceType; + +import java.io.IOException; + +public class CpuCore implements CgroupCore { + + public static final String CPU_SHARES = "/cpu.shares"; + public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us"; + public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us"; + + private final String dir; + + public CpuCore(String dir) { + this.dir = dir; + } + + @Override + public ResourceType getType() { + // TODO Auto-generated method stub + return ResourceType.cpu; + } + + public void setCpuShares(int weight) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight)); + } + + public int getCpuShares() throws IOException { + return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0)); + } + + public void setCpuCfsPeriodUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us)); + } + + public void setCpuCfsQuotaUs(long us) throws IOException { + CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java new file mode 100644 index 0000000..5b2a890 --- /dev/null +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.utils; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SystemOperation { + + public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); + + public static void mount(String name, String target, String type, String data) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); + SystemOperation.exec(sb.toString()); + } + + public static void umount(String name) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("umount ").append(name); + SystemOperation.exec(sb.toString()); + } + + public static String exec(String cmd) throws IOException { + LOG.debug("Shell cmd: " + cmd); + Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start(); + try { + process.waitFor(); + String output = IOUtils.toString(process.getInputStream()); + String errorOutput = IOUtils.toString(process.getErrorStream()); + LOG.debug("Shell Output: " + output); + if (errorOutput.length() != 0) { + LOG.error("Shell Error Output: " + errorOutput); + throw new IOException(errorOutput); + } + return output; + } catch (InterruptedException ie) { + throw new IOException(ie.toString()); + } + } + + public static void main(String[] args) throws IOException { + SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala deleted file mode 100644 index ae7fb42..0000000 --- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.worker - -import com.typesafe.config.Config -import org.apache.commons.lang.SystemUtils -import org.slf4j.{Logger, LoggerFactory} - -import io.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore} -import io.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType} -import io.gearpump.cluster.worker.CGroupManager._ - -class CGroupManager(config: Config) { - private val center = CgroupCenter.getInstance() - private val rootDir = CGroupManager.getCgroupRootDir(config) - private var hierarchy: Hierarchy = null - private var rootCgroup: CgroupCommon = null - - prepareSubSystem() - - private def prepareSubSystem(): Unit = { - if (rootDir == null) { - throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.") - } - if (center == null) { - throw new RuntimeException("Cgroup error, please check /proc/cgroups") - } - hierarchy = center.busy(ResourceType.cpu) - if (hierarchy == null) { - val types = new java.util.HashSet[ResourceType] - types.add(ResourceType.cpu) - hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR) - } - rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups) - } - - private def validateCpuUpperLimitValue(value: Int): Int = { - if (value > 10) { - 10 - } else if (value < 1 && value != -1) { - 1 - } else { - value - } - } - - private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = { - val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit) - if (_cpuCoreUpperLimit == -1) { - cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit) - } - else { - cpuCore.setCpuCfsPeriodUs(100000) - cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000) - } - } - - def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = { - val groupName = getGroupName(appId, executorId) - val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup) - this.center.create(workerGroup) - val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu) - val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore] - cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT) - setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config)) - - val sb: StringBuilder = new StringBuilder - sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList - } - - def shutDownExecutor(appId: Int, executorId: Int): Unit = { - val groupName = getGroupName(appId, executorId) - val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup) - center.delete(workerGroup) - } - - def close(): Unit = { - center.delete(rootCgroup) - } - - private def getGroupName(appId: Int, executorId: Int): String = { - "app" + appId + "executor" + executorId - } -} - -object CGroupManager { - private val LOG: Logger = LoggerFactory.getLogger(getClass) - private val CGROUP_ROOT = "gearpump.cgroup.root" - private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor" - private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu" - private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu" - private val ONE_CPU_SLOT = 1024 - - def getCgroupRootDir(config: Config): String = { - config.getString(CGROUP_ROOT) - } - - def getWorkerCpuCoreUpperLimit(config: Config): Int = { - config.getInt(Executor_CPU_CORE_UPPER_LIMIT) - } - - def getInstance(config: Config): Option[CGroupManager] = { - if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) { - LOG.error(s"CGroup is not supported on Windows OS, Mac OS X") - None - } else { - Some(new CGroupManager(config)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala deleted file mode 100644 index eb57a18..0000000 --- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.worker - -import java.io.File -import scala.sys.process.Process - -import com.typesafe.config.Config -import org.slf4j.{Logger, LoggerFactory} - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.util.{ProcessLogRedirector, RichProcess} - -/** - * CGroupProcessLauncher is used to launch a process for Executor with CGroup. - * For more details, please refer http://gearpump.io - */ -class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher { - private val APP_MASTER = -1 - private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config) - private val LOG: Logger = LoggerFactory.getLogger(getClass) - - override def cleanProcess(appId: Int, executorId: Int): Unit = { - if (executorId != APP_MASTER) { - cgroupManager.foreach(_.shutDownExecutor(appId, executorId)) - } - } - - override def createProcess( - appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], - classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { - val cgroupCommand = if (executorId != APP_MASTER) { - cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, - executorId)).getOrElse(List.empty) - } else List.empty - LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " + - s"classpath: ${classPath.mkString(File.pathSeparator)}") - - val java = System.getProperty("java.home") + "/bin/java" - val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath - .mkString(File.pathSeparator), mainClass) ++ arguments - LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " + - s"options: ${options.mkString(" ")}") - val logger = new ProcessLogRedirector() - val process = Process(command).run(logger) - new RichProcess(process, logger) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala new file mode 100644 index 0000000..24c169c --- /dev/null +++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import com.typesafe.config.Config +import org.apache.commons.lang.SystemUtils +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore} +import org.apache.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType} +import org.apache.gearpump.cluster.worker.CGroupManager._ + +class CGroupManager(config: Config) { + private val center = CgroupCenter.getInstance() + private val rootDir = CGroupManager.getCgroupRootDir(config) + private var hierarchy: Hierarchy = null + private var rootCgroup: CgroupCommon = null + + prepareSubSystem() + + private def prepareSubSystem(): Unit = { + if (rootDir == null) { + throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.") + } + if (center == null) { + throw new RuntimeException("Cgroup error, please check /proc/cgroups") + } + hierarchy = center.busy(ResourceType.cpu) + if (hierarchy == null) { + val types = new java.util.HashSet[ResourceType] + types.add(ResourceType.cpu) + hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR) + } + rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups) + } + + private def validateCpuUpperLimitValue(value: Int): Int = { + if (value > 10) { + 10 + } else if (value < 1 && value != -1) { + 1 + } else { + value + } + } + + private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = { + val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit) + if (_cpuCoreUpperLimit == -1) { + cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit) + } + else { + cpuCore.setCpuCfsPeriodUs(100000) + cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000) + } + } + + def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = { + val groupName = getGroupName(appId, executorId) + val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup) + this.center.create(workerGroup) + val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu) + val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore] + cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT) + setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config)) + + val sb: StringBuilder = new StringBuilder + sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList + } + + def shutDownExecutor(appId: Int, executorId: Int): Unit = { + val groupName = getGroupName(appId, executorId) + val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup) + center.delete(workerGroup) + } + + def close(): Unit = { + center.delete(rootCgroup) + } + + private def getGroupName(appId: Int, executorId: Int): String = { + "app" + appId + "executor" + executorId + } +} + +object CGroupManager { + private val LOG: Logger = LoggerFactory.getLogger(getClass) + private val CGROUP_ROOT = "gearpump.cgroup.root" + private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor" + private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu" + private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu" + private val ONE_CPU_SLOT = 1024 + + def getCgroupRootDir(config: Config): String = { + config.getString(CGROUP_ROOT) + } + + def getWorkerCpuCoreUpperLimit(config: Config): Int = { + config.getInt(Executor_CPU_CORE_UPPER_LIMIT) + } + + def getInstance(config: Config): Option[CGroupManager] = { + if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) { + LOG.error(s"CGroup is not supported on Windows OS, Mac OS X") + None + } else { + Some(new CGroupManager(config)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala new file mode 100644 index 0000000..dc2eabd --- /dev/null +++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import java.io.File +import scala.sys.process.Process + +import com.typesafe.config.Config +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.util.{ProcessLogRedirector, RichProcess} + +/** + * CGroupProcessLauncher is used to launch a process for Executor with CGroup. + * For more details, please refer http://gearpump.io + */ +class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher { + private val APP_MASTER = -1 + private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config) + private val LOG: Logger = LoggerFactory.getLogger(getClass) + + override def cleanProcess(appId: Int, executorId: Int): Unit = { + if (executorId != APP_MASTER) { + cgroupManager.foreach(_.shutDownExecutor(appId, executorId)) + } + } + + override def createProcess( + appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], + classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { + val cgroupCommand = if (executorId != APP_MASTER) { + cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, + executorId)).getOrElse(List.empty) + } else List.empty + LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " + + s"classpath: ${classPath.mkString(File.pathSeparator)}") + + val java = System.getProperty("java.home") + "/bin/java" + val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath + .mkString(File.pathSeparator), mainClass) ++ arguments + LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " + + s"options: ${options.mkString(" ")}") + val logger = new ProcessLogRedirector() + val process = Process(command).run(logger) + new RichProcess(process, logger) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java deleted file mode 100644 index 510258d..0000000 --- a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util; - -import backtype.storm.utils.TimeCacheMap; - -/** - * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression. - */ -@SuppressWarnings("deprecation") -public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> { - - public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) { - super(expirationSecs, new ExpiredCallback<K, V>() { - - @Override - public void expire(K key, V val) { - callback.expire(key, val); - } - }); - } - - public static interface Callback<K, V> { - public void expire(K key, V val); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java new file mode 100644 index 0000000..923883c --- /dev/null +++ b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util; + +import backtype.storm.utils.TimeCacheMap; + +/** + * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression. + */ +@SuppressWarnings("deprecation") +public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> { + + public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) { + super(expirationSecs, new ExpiredCallback<K, V>() { + + @Override + public void expire(K key, V val) { + callback.expire(key, val); + } + }); + } + + public static interface Callback<K, V> { + public void expire(K key, V val); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/resources/geardefault.conf b/experiments/storm/src/main/resources/geardefault.conf index 54c478e..38ac9d3 100644 --- a/experiments/storm/src/main/resources/geardefault.conf +++ b/experiments/storm/src/main/resources/geardefault.conf @@ -1,5 +1,5 @@ gearpump { storm { - serialization-framework = "io.gearpump.experiments.storm.util.StormSerializationFramework" + serialization-framework = "org.apache.gearpump.experiments.storm.util.StormSerializationFramework" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala deleted file mode 100644 index 19814e9..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm - -import org.slf4j.Logger - -import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient} -import io.gearpump.util.LogUtil - -object StormRunner { - private val LOG: Logger = LogUtil.getLogger(getClass) - - private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient) - - private def usage(): Unit = { - val keys = commands.keys.toList.sorted - // scalastyle:off println - Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") - // scalastyle:on println - } - - private def executeCommand(command: String, commandArgs: Array[String]): Unit = { - if (!commands.contains(command)) { - usage() - } else { - commands(command).main(commandArgs) - } - } - - def main(args: Array[String]): Unit = { - if (args.length == 0) { - usage() - } else { - val command = args(0) - val commandArgs = args.drop(1) - executeCommand(command, commandArgs) - } - } -}
