Repository: storm Updated Branches: refs/heads/1.x-branch fc64e158f -> 885aaec43
Initial changes. Signed-off-by: Kyle Nusbaum <[email protected]> Ready for PR Signed-off-by: Kyle Nusbaum <[email protected]> Addressing comments. Signed-off-by: Kyle Nusbaum <[email protected]> Addressing Comments. Signed-off-by: Kyle Nusbaum <[email protected]> adding code documentation explaining math of combining component resources. Signed-off-by: Kyle Nusbaum <[email protected]> Addressing comments and adding a bit more documentation. Signed-off-by: Kyle Nusbaum <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0d53be9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0d53be9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0d53be9 Branch: refs/heads/1.x-branch Commit: d0d53be94a967a07ed767ec1c79738649dd70722 Parents: fc64e15 Author: Kyle Nusbaum <[email protected]> Authored: Tue Mar 8 15:46:23 2016 -0600 Committer: Kyle Nusbaum <[email protected]> Committed: Thu Mar 17 15:49:53 2016 -0500 ---------------------------------------------------------------------- .../clj/org/apache/storm/trident/testing.clj | 12 +- .../ComponentConfigurationDeclarer.java | 5 +- .../apache/storm/topology/ResourceDeclarer.java | 28 +++++ .../jvm/org/apache/storm/trident/Stream.java | 31 +++++- .../org/apache/storm/trident/TridentState.java | 27 ++++- .../apache/storm/trident/TridentTopology.java | 91 ++++++++++++++- .../org/apache/storm/trident/graph/Group.java | 22 +++- .../operation/DefaultResourceDeclarer.java | 66 +++++++++++ .../trident/operation/ITridentResource.java | 32 ++++++ .../org/apache/storm/trident/planner/Node.java | 5 +- .../apache/storm/trident/integration_test.clj | 111 ++++++++++++++++--- 11 files changed, 390 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/clj/org/apache/storm/trident/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj index 44e5ca9..aafb001 100644 --- a/storm-core/src/clj/org/apache/storm/trident/testing.clj +++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj @@ -55,14 +55,14 @@ (.shutdown ~drpc) )) -(defn with-topology* [cluster topo body-fn] - (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo)) +(defn with-topology* [cluster storm-topo body-fn] + (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo) (body-fn) - (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))) - ) + (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))) -(defmacro with-topology [[cluster topo] & body] - `(with-topology* ~cluster ~topo (fn [] ~@body))) +(defmacro with-topology [[cluster topo storm-topo] & body] + `(let [~storm-topo (.build ~topo)] + (with-topology* ~cluster ~storm-topo (fn [] ~@body)))) (defn bootstrap-imports [] (import 'org.apache.storm.LocalDRPC) http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java index 328af55..5dc7264 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java +++ b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java @@ -19,14 +19,11 @@ package org.apache.storm.topology; import java.util.Map; -public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> { +public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> extends ResourceDeclarer<T> { T addConfigurations(Map<String, Object> conf); T addConfiguration(String config, Object value); T setDebug(boolean debug); T setMaxTaskParallelism(Number val); T setMaxSpoutPending(Number val); T setNumTasks(Number val); - T setMemoryLoad(Number onHeap); - T setMemoryLoad(Number onHeap, Number offHeap); - T setCPULoad(Number amount); } http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java new file mode 100644 index 0000000..4f648eb --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +/** + * This is a new base interface that can be used by anything that wants to mirror + * RAS's basic API. Trident uses this to allow setting resources in the Stream API. + */ +public interface ResourceDeclarer <T extends ResourceDeclarer> { + T setMemoryLoad(Number onHeap); + T setMemoryLoad(Number onHeap, Number offHeap); + T setCPULoad(Number amount); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index d313678..4a51b56 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -20,6 +20,7 @@ package org.apache.storm.trident; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.NullStruct; import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.topology.ResourceDeclarer; import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer; import org.apache.storm.trident.fluent.GlobalAggregationScheme; import org.apache.storm.trident.fluent.GroupedStream; @@ -90,7 +91,7 @@ import java.util.Comparator; * */ // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE) -public class Stream implements IAggregatableStream { +public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { Node _node; TridentTopology _topology; String _name; @@ -124,6 +125,34 @@ public class Stream implements IAggregatableStream { } /** + * Sets the CPU Load resource for the current operation + */ + @Override + public Stream setCPULoad(Number load) { + _node.setCPULoad(load); + return this; + } + + /** + * Sets the Memory Load resources for the current operation. + * offHeap becomes default + */ + @Override + public Stream setMemoryLoad(Number onHeap) { + _node.setMemoryLoad(onHeap); + return this; + } + + /** + * Sets the Memory Load resources for the current operation. + */ + @Override + public Stream setMemoryLoad(Number onHeap, Number offHeap) { + _node.setMemoryLoad(onHeap, offHeap); + return this; + } + + /** * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`. * * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling" http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java index 7173254..18b60e0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java @@ -17,24 +17,43 @@ */ package org.apache.storm.trident; +import org.apache.storm.topology.ResourceDeclarer; import org.apache.storm.trident.planner.Node; -public class TridentState { +public class TridentState implements ResourceDeclarer<TridentState> { TridentTopology _topology; Node _node; - + protected TridentState(TridentTopology topology, Node node) { _topology = topology; _node = node; } - + public Stream newValuesStream() { return new Stream(_topology, _node.name, _node); } - + public TridentState parallelismHint(int parallelism) { _node.parallelismHint = parallelism; return this; } + + @Override + public TridentState setCPULoad(Number load) { + _node.setCPULoad(load); + return this; + } + + @Override + public TridentState setMemoryLoad(Number onHeap) { + _node.setMemoryLoad(onHeap); + return this; + } + + @Override + public TridentState setMemoryLoad(Number onHeap, Number offHeap) { + _node.setMemoryLoad(onHeap, offHeap); + return this; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index eb50a10..e0a349b 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen; import org.apache.storm.trident.graph.GraphGrouper; import org.apache.storm.trident.graph.Group; import org.apache.storm.trident.operation.GroupedMultiReducer; +import org.apache.storm.trident.operation.ITridentResource; import org.apache.storm.trident.operation.MultiReducer; import org.apache.storm.trident.operation.impl.FilterExecutor; import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor; @@ -394,11 +395,23 @@ public class TridentTopology { Map<Node, String> spoutIds = genSpoutIds(spoutNodes); Map<Group, String> boltIds = genBoltIds(mergedGroups); + Map defaults = Utils.readDefaultConfig(); + for(SpoutNode sn: spoutNodes) { Integer parallelism = parallelisms.get(grouper.nodeGroup(sn)); + + Map<String, Number> spoutRes = null; + spoutRes = mergeDefaultResources(sn.getResources(), defaults); + Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + if(sn.type == SpoutNode.SpoutType.DRPC) { + builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId, - (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn)); + (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn)) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); } else { ITridentSpout s; if(sn.spout instanceof IBatchSpout) { @@ -409,16 +422,26 @@ public class TridentTopology { throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor"); // TODO: handle regular rich spout without batches (need lots of updates to support this throughout) } - builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn)); + builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn)) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); } } - + for(Group g: mergedGroups) { if(!isSpoutGroup(g)) { Integer p = parallelisms.get(g); Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap); + Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults); + + Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p, - committerBatches(g, batchGroupMap), streamToGroup); + committerBatches(g, batchGroupMap), streamToGroup) + .setMemoryLoad(onHeap, offHeap) + .setCPULoad(cpuLoad); Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g)); for(PartitionNode n: inputs) { Node parent = TridentUtils.getParent(graph, n); @@ -431,6 +454,64 @@ public class TridentTopology { return builder.buildTopology(); } + + private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) { + Map<String, Number> ret = new HashMap<String, Number>(); + + Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + + if(res == null) { + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault); + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeapDefault); + ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoadDefault); + return ret; + } + + Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + + /* We take the max of the default and whatever the user put in here. + Each node's resources can be the sum of several operations, so the simplest + thing to do is get the max. + + The situation we want to avoid is that the user sets low resources on one + node, and when that node is combined with a bunch of others, the sum is still + that low resource count. If any component isn't set, we want to use the default. + + Right now, this code does not check that. It just takes the max of the summed + up resource counts for simplicity's sake. We could perform some more complicated + logic to be more accurate, but the benefits are very small, and only apply to some + very odd corner cases. */ + if(onHeap == null) { + onHeap = onHeapDefault; + } + else { + onHeap = Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue()); + } + + if(offHeap == null) { + offHeap = offHeapDefault; + } + else { + offHeap = Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue()); + } + + if(cpuLoad == null) { + cpuLoad = cpuLoadDefault; + } + else { + cpuLoad = Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue()); + } + + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); + ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); + ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad); + + return ret; + } private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) { List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets(); @@ -464,7 +545,7 @@ public class TridentTopology { } return ret; } - + //returns null if it's not a drpc group private static SpoutNode getDRPCSpoutNode(Collection<Node> g) { for(Node n: g) { http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java index ef1399b..2c92304 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java +++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java @@ -18,17 +18,20 @@ package org.apache.storm.trident.graph; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.jgrapht.DirectedGraph; +import org.apache.storm.trident.operation.ITridentResource; import org.apache.storm.trident.planner.Node; import org.apache.storm.trident.util.IndexedEdge; import org.apache.storm.trident.util.TridentUtils; -public class Group { +public class Group implements ITridentResource { public final Set<Node> nodes = new HashSet<>(); private final DirectedGraph<Node, IndexedEdge> graph; private final String id = UUID.randomUUID().toString(); @@ -65,6 +68,23 @@ public class Group { } @Override + public Map<String, Number> getResources() { + Map<String, Number> ret = new HashMap<>(); + for(Node n: nodes) { + Map<String, Number> res = n.getResources(); + for(Map.Entry<String, Number> kv : res.entrySet()) { + String key = kv.getKey(); + Number val = kv.getValue(); + if(ret.containsKey(key)) { + val = new Double(val.doubleValue() + ret.get(key).doubleValue()); + } + ret.put(key, val); + } + } + return ret; + } + + @Override public int hashCode() { return id.hashCode(); } http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java new file mode 100644 index 0000000..d49011a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.storm.topology.ResourceDeclarer; + +/** + * @param T Must always be the type of the extending class. i.e. + * public class SubResourceDeclarer extends DefaultResourceDeclarer<SubResourceDeclarer> {...} + */ +public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource { + + private Map<String, Number> resources = new HashMap<>(); + private Map conf = Utils.readStormConfig(); + + @Override + public T setMemoryLoad(Number onHeap) { + return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))); + } + + @Override + public T setMemoryLoad(Number onHeap, Number offHeap) { + if (onHeap != null) { + onHeap = onHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); + } + if (offHeap!=null) { + offHeap = offHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); + } + return (T)this; + } + + @Override + public T setCPULoad(Number amount) { + if(amount != null) { + amount = amount.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); + } + return (T)this; + } + + @Override + public Map<String, Number> getResources() { + return new HashMap<String, Number>(resources); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java new file mode 100644 index 0000000..b3e10ef --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.Map; + +/** + * This interface is implemented by various Trident classes in order to + * gather and propogate resources that have been set on them. + * @see ResourceDeclarer + */ +public interface ITridentResource { + /** + * @return a name of resource name -> amount of that resource. *Return should never be null!* + */ + Map<String, Number> getResources(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java index 64d8a3b..b2466e6 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java +++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java @@ -17,6 +17,7 @@ */ package org.apache.storm.trident.planner; +import org.apache.storm.trident.operation.DefaultResourceDeclarer; import org.apache.storm.tuple.Fields; import java.io.Serializable; import java.util.UUID; @@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; -public class Node implements Serializable { +public class Node extends DefaultResourceDeclarer<Node> implements Serializable { private static final AtomicInteger INDEX = new AtomicInteger(0); private String nodeId; @@ -62,6 +63,4 @@ public class Node implements Serializable { public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); } - - } http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj index 4c52286..7f81c4b 100644 --- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj @@ -19,10 +19,13 @@ (:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter MemoryMapState$Factory]) (:import [org.apache.storm.trident.state StateSpec]) - (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]) - (:use [org.apache.storm.trident testing]) - (:use [org.apache.storm util])) - + (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater] + [org.apache.storm.trident.operation BaseFunction] + [org.json.simple.parser JSONParser] + [org.apache.storm Config]) + (:use [org.apache.storm.trident testing] + [org.apache.storm log util config])) + (bootstrap-imports) (deftest test-memory-map-get-tuples @@ -38,13 +41,13 @@ (.groupBy (fields "word")) (.persistentAggregate (memory-map-state) (Count.) (fields "count")) (.parallelismHint 6) - )) + )) (-> topo (.newDRPCStream "all-tuples" drpc) (.broadcast) (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count")) (.project (fields "word" "count"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]} (into #{} (exec-drpc drpc "all-tuples" "man")))) @@ -73,7 +76,7 @@ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count")) (.aggregate (fields "count") (Sum.) (fields "sum")) (.project (fields "sum"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= [[2]] (exec-drpc drpc "words" "the"))) (is (= [[1]] (exec-drpc drpc "words" "hello"))) @@ -83,7 +86,7 @@ (is (= [[8]] (exec-drpc drpc "words" "man where you the"))) ))))) -;; this test reproduces a bug where committer spouts freeze processing when +;; this test reproduces a bug where committer spouts freeze processing when ;; there's at least one repartitioning after the spout (deftest test-word-count-committer-spout (t/with-local-cluster [cluster] @@ -108,7 +111,7 @@ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count")) (.aggregate (fields "count") (Sum.) (fields "sum")) (.project (fields "sum"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= [[2]] (exec-drpc drpc "words" "the"))) (is (= [[1]] (exec-drpc drpc "words" "hello"))) @@ -135,13 +138,13 @@ (.aggregate (CountAsAggregator.) (fields "count")) (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly (.project (fields "count"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (doseq [i (range 100)] (is (= [[1]] (exec-drpc drpc "numwords" "the")))) (is (= [[0]] (exec-drpc drpc "numwords" ""))) (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8"))) ))))) - + (deftest test-split-merge (t/with-local-cluster [cluster] (with-drpc [drpc] @@ -158,7 +161,7 @@ (.project (fields "len")))) (.merge topo [s1 s2]) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man"))) (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello"))) ))))) @@ -180,11 +183,11 @@ (.aggregate (CountAsAggregator.) (fields "count")))) (.merge topo [s1 s2]) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the"))) (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa"))) ))))) - + (deftest test-multi-repartition (t/with-local-cluster [cluster] (with-drpc [drpc] @@ -196,7 +199,7 @@ (.shuffle) (.aggregate (CountAsAggregator.) (fields "count")) )) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man"))) (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa"))) ))))) @@ -270,6 +273,82 @@ (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count")))))) ))) + +(deftest test-set-component-resources + (t/with-local-cluster [cluster] + (with-drpc [drpc] + (letlocals + (bind topo (TridentTopology.)) + (bind feeder (feeder-spout ["sentence"])) + (bind add-bang (proxy [BaseFunction] [] + (execute [tuple collector] + (. collector emit (str (. tuple getString 0) "!"))))) + (bind word-counts + (.. topo + (newStream "words" feeder) + (parallelismHint 5) + (setCPULoad 20) + (setMemoryLoad 512 256) + (each (fields "sentence") (Split.) (fields "word")) + (setCPULoad 10) + (setMemoryLoad 512) + (each (fields "word") add-bang (fields "word!")) + (parallelismHint 10) + (setCPULoad 50) + (setMemoryLoad 1024) + (groupBy (fields "word!")) + (persistentAggregate (memory-map-state) (Count.) (fields "count")) + (setCPULoad 100) + (setMemoryLoad 2048))) + (with-topology [cluster topo storm-topo] + + (let [parse-fn (fn [[k v]] + [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))]) + json-confs (into {} (map parse-fn (. storm-topo get_bolts)))] + (testing "spout memory" + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 512.0)) + + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB)) + 256.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 512.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB)) + 256.0))) + + (testing "spout CPU" + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 20.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 20.0))) + + (testing "bolt combinations" + (is (= (-> (json-confs "b-1") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + (+ 1024.0 512.0))) + + (is (= (-> (json-confs "b-1") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 60.0))) + + (testing "aggregations after partition" + (is (= (-> (json-confs "b-0") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 2048.0)) + + (is (= (-> (json-confs "b-0") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 100.0))))))))) + ;; (deftest test-split-merge ;; (t/with-local-cluster [cluster] ;; (with-drpc [drpc] @@ -284,7 +363,7 @@ ;; (-> drpc-stream ;; (.each (fields "args") (StringLength.) (fields "len")) ;; (.project (fields "len")))) -;; +;; ;; (.merge topo [s1 s2]) ;; (with-topology [cluster topo] ;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
