Repository: storm Updated Branches: refs/heads/master 50701df4a -> 367464a3d
Initial changes. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17a55d20 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17a55d20 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17a55d20 Branch: refs/heads/master Commit: 17a55d20d3f20bc04ca48ee3a9f63eaed8960b9c Parents: b477939 Author: Kyle Nusbaum <[email protected]> Authored: Tue Mar 8 15:46:23 2016 -0600 Committer: Kyle Nusbaum <[email protected]> Committed: Tue Mar 8 15:46:23 2016 -0600 ---------------------------------------------------------------------- .../ComponentConfigurationDeclarer.java | 5 +- .../apache/storm/topology/ResourceDeclarer.java | 24 ++++++ .../apache/storm/trident/TridentTopology.java | 84 ++++++++++++++++++-- .../org/apache/storm/trident/graph/Group.java | 24 +++++- .../operation/DefaultResourceDeclarer.java | 62 +++++++++++++++ .../trident/operation/ITridentResource.java | 24 ++++++ 6 files changed, 213 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java index 328af55..5dc7264 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java +++ b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java @@ -19,14 +19,11 @@ package org.apache.storm.topology; import java.util.Map; -public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> { +public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> extends ResourceDeclarer<T> { T addConfigurations(Map<String, Object> conf); T addConfiguration(String config, Object value); T setDebug(boolean debug); T setMaxTaskParallelism(Number val); T setMaxSpoutPending(Number val); T setNumTasks(Number val); - T setMemoryLoad(Number onHeap); - T setMemoryLoad(Number onHeap, Number offHeap); - T setCPULoad(Number amount); } http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java new file mode 100644 index 0000000..de530b3 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +public interface ResourceDeclarer <T extends ResourceDeclarer> { + T setMemoryLoad(Number onHeap); + T setMemoryLoad(Number onHeap, Number offHeap); + T setCPULoad(Number amount); +} http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index eb50a10..3836663 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen; import org.apache.storm.trident.graph.GraphGrouper; import org.apache.storm.trident.graph.Group; import org.apache.storm.trident.operation.GroupedMultiReducer; +import org.apache.storm.trident.operation.ITridentResource; import org.apache.storm.trident.operation.MultiReducer; import org.apache.storm.trident.operation.impl.FilterExecutor; import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor; @@ -394,11 +395,28 @@ public class TridentTopology { Map<Node, String> spoutIds = genSpoutIds(spoutNodes); Map<Group, String> boltIds = genBoltIds(mergedGroups); + Map defaults = Utils.readDefaultConfig(); + for(SpoutNode sn: spoutNodes) { Integer parallelism = parallelisms.get(grouper.nodeGroup(sn)); + + Map<String, Number> spoutRes = null; + if(sn instanceof ITridentResource) { + spoutRes = mergeDefaultResources(((ITridentResource)sn).getResources(), defaults); + } + else { + spoutRes = mergeDefaultResources(null, defaults); + } + Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + if(sn.type == SpoutNode.SpoutType.DRPC) { + builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId, - (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn)); + (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn)) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); } else { ITridentSpout s; if(sn.spout instanceof IBatchSpout) { @@ -409,16 +427,26 @@ public class TridentTopology { throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor"); // TODO: handle regular rich spout without batches (need lots of updates to support this throughout) } - builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn)); + builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn)) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); } } - + for(Group g: mergedGroups) { if(!isSpoutGroup(g)) { Integer p = parallelisms.get(g); Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap); + Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults); + + Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p, - committerBatches(g, batchGroupMap), streamToGroup); + committerBatches(g, batchGroupMap), streamToGroup) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g)); for(PartitionNode n: inputs) { Node parent = TridentUtils.getParent(graph, n); @@ -431,6 +459,52 @@ public class TridentTopology { return builder.buildTopology(); } + + private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) { + Map<String, Number> ret = new HashMap<String, Number>(); + + Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + + if(res == null) { + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault); + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeapDefault); + ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoadDefault); + return ret; + } + + Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + + if(onHeap == null) { + onHeap = onHeapDefault; + } + else { + onHeap = Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue()); + } + + if(offHeap == null) { + offHeap = offHeapDefault; + } + else { + offHeap = Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue()); + } + + if(cpuLoad == null) { + cpuLoad = cpuLoadDefault; + } + else { + cpuLoad = Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue()); + } + + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); + ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad); + + return ret; + } private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) { List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets(); @@ -464,7 +538,7 @@ public class TridentTopology { } return ret; } - + //returns null if it's not a drpc group private static SpoutNode getDRPCSpoutNode(Collection<Node> g) { for(Node n: g) { http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java index ef1399b..a61e3f5 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java +++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java @@ -18,17 +18,20 @@ package org.apache.storm.trident.graph; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.jgrapht.DirectedGraph; +import org.apache.storm.trident.operation.ITridentResource; import org.apache.storm.trident.planner.Node; import org.apache.storm.trident.util.IndexedEdge; import org.apache.storm.trident.util.TridentUtils; -public class Group { +public class Group implements ITridentResource { public final Set<Node> nodes = new HashSet<>(); private final DirectedGraph<Node, IndexedEdge> graph; private final String id = UUID.randomUUID().toString(); @@ -65,6 +68,25 @@ public class Group { } @Override + public Map<String, Number> getResources() { + Map<String, Number> ret = new HashMap<>(); + for(Node n: nodes) { + if(n instanceof ITridentResource) { + Map<String, Number> res = ((ITridentResource)n).getResources(); + for(Map.Entry<String, Number> kv : res.entrySet()) { + String key = kv.getKey(); + Number val = kv.getValue(); + if(ret.containsKey(key)) { + val = new Double(val.doubleValue() + ret.get(key).doubleValue()); + } + ret.put(key, val); + } + } + } + return ret; + } + + @Override public int hashCode() { return id.hashCode(); } http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java new file mode 100644 index 0000000..72ca27e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.storm.topology.ResourceDeclarer; + +public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource { + + private Map<String, Number> resources = new HashMap<>(); + private Map conf = Utils.readStormConfig(); + + @Override + public DefaultResourceDeclarer setMemoryLoad(Number onHeap) { + return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))); + } + + @Override + public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + if (onHeap != null) { + onHeap = onHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); + } + if (offHeap!=null) { + offHeap = offHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); + } + return this; + } + + @Override + public DefaultResourceDeclarer setCPULoad(Number amount) { + if(amount != null) { + amount = amount.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); + } + return this; + } + + @Override + public Map<String, Number> getResources() { + return new HashMap<String, Number>(resources); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java new file mode 100644 index 0000000..4b8a047 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.Map; + +public interface ITridentResource { + Map<String, Number> getResources(); +}
