Repository: storm
Updated Branches:
  refs/heads/master 50701df4a -> 367464a3d


Initial changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17a55d20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17a55d20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17a55d20

Branch: refs/heads/master
Commit: 17a55d20d3f20bc04ca48ee3a9f63eaed8960b9c
Parents: b477939
Author: Kyle Nusbaum <[email protected]>
Authored: Tue Mar 8 15:46:23 2016 -0600
Committer: Kyle Nusbaum <[email protected]>
Committed: Tue Mar 8 15:46:23 2016 -0600

----------------------------------------------------------------------
 .../ComponentConfigurationDeclarer.java         |  5 +-
 .../apache/storm/topology/ResourceDeclarer.java | 24 ++++++
 .../apache/storm/trident/TridentTopology.java   | 84 ++++++++++++++++++--
 .../org/apache/storm/trident/graph/Group.java   | 24 +++++-
 .../operation/DefaultResourceDeclarer.java      | 62 +++++++++++++++
 .../trident/operation/ITridentResource.java     | 24 ++++++
 6 files changed, 213 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
 
b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
index 328af55..5dc7264 100644
--- 
a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
+++ 
b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
@@ -19,14 +19,11 @@ package org.apache.storm.topology;
 
 import java.util.Map;
 
-public interface ComponentConfigurationDeclarer<T extends 
ComponentConfigurationDeclarer> {
+public interface ComponentConfigurationDeclarer<T extends 
ComponentConfigurationDeclarer> extends ResourceDeclarer<T> {
     T addConfigurations(Map<String, Object> conf);
     T addConfiguration(String config, Object value);
     T setDebug(boolean debug);
     T setMaxTaskParallelism(Number val);
     T setMaxSpoutPending(Number val);
     T setNumTasks(Number val);
-    T setMemoryLoad(Number onHeap);
-    T setMemoryLoad(Number onHeap, Number offHeap);
-    T setCPULoad(Number amount);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java 
b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
new file mode 100644
index 0000000..de530b3
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+public interface ResourceDeclarer <T extends ResourceDeclarer> {
+    T setMemoryLoad(Number onHeap);
+    T setMemoryLoad(Number onHeap, Number offHeap);
+    T setCPULoad(Number amount);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java 
b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index eb50a10..3836663 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen;
 import org.apache.storm.trident.graph.GraphGrouper;
 import org.apache.storm.trident.graph.Group;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.operation.MultiReducer;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
 import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -394,11 +395,28 @@ public class TridentTopology {
         Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
         Map<Group, String> boltIds = genBoltIds(mergedGroups);
         
+        Map defaults = Utils.readDefaultConfig();
+
         for(SpoutNode sn: spoutNodes) {
             Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
+
+            Map<String, Number> spoutRes = null;
+            if(sn instanceof ITridentResource) {
+                spoutRes = 
mergeDefaultResources(((ITridentResource)sn).getResources(), defaults);
+            }
+            else {
+                spoutRes = mergeDefaultResources(null, defaults);
+            }
+            Number onHeap = 
spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            Number offHeap = 
spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+            Number cpuLoad = 
spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
             if(sn.type == SpoutNode.SpoutType.DRPC) {
+
                 builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
-                        (IRichSpout) sn.spout, parallelism, 
batchGroupMap.get(sn));
+                                              (IRichSpout) sn.spout, 
parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             } else {
                 ITridentSpout s;
                 if(sn.spout instanceof IBatchSpout) {
@@ -409,16 +427,26 @@ public class TridentTopology {
                     throw new RuntimeException("Regular rich spouts not 
supported yet... try wrapping in a RichSpoutBatchExecutor");
                     // TODO: handle regular rich spout without batches (need 
lots of updates to support this throughout)
                 }
-                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, 
parallelism, batchGroupMap.get(sn));
+                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, 
parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             }
         }
-        
+
         for(Group g: mergedGroups) {
             if(!isSpoutGroup(g)) {
                 Integer p = parallelisms.get(g);
                 Map<String, String> streamToGroup = 
getOutputStreamBatchGroups(g, batchGroupMap);
+                Map<String, Number> groupRes = 
mergeDefaultResources(g.getResources(), defaults);
+
+                Number onHeap = 
groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+                Number offHeap = 
groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+                Number cpuLoad = 
groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
                 BoltDeclarer d = builder.setBolt(boltIds.get(g), new 
SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
-                        committerBatches(g, batchGroupMap), streamToGroup);
+                                                 committerBatches(g, 
batchGroupMap), streamToGroup)
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
                 Collection<PartitionNode> inputs = 
uniquedSubscriptions(externalGroupInputs(g));
                 for(PartitionNode n: inputs) {
                     Node parent = TridentUtils.getParent(graph, n);
@@ -431,6 +459,52 @@ public class TridentTopology {
         
         return builder.buildTopology();
     }
+
+    private static Map<String, Number> mergeDefaultResources(Map<String, 
Number> res, Map defaultConfig) {
+        Map<String, Number> ret = new HashMap<String, Number>();
+
+        Number  onHeapDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeapDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoadDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        if(res == null) {
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
onHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 
cpuLoadDefault);
+            return ret;
+        }
+
+        Number  onHeap = 
res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeap = 
res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        if(onHeap == null) {
+            onHeap = onHeapDefault;
+        }
+        else {
+            onHeap = Math.max(onHeap.doubleValue(), 
onHeapDefault.doubleValue());
+        }
+
+        if(offHeap == null) {
+            offHeap = offHeapDefault;
+        }
+        else {
+            offHeap = Math.max(offHeap.doubleValue(), 
offHeapDefault.doubleValue());
+        }
+
+        if(cpuLoad == null) {
+            cpuLoad = cpuLoadDefault;
+        }
+        else {
+            cpuLoad = Math.max(cpuLoad.doubleValue(), 
cpuLoadDefault.doubleValue());
+        }
+
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad);
+
+        return ret;
+    }
     
     private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> 
graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
         List<Set<Node>> connectedComponents = new 
ConnectivityInspector<>(graph).connectedSets();
@@ -464,7 +538,7 @@ public class TridentTopology {
         }
         return ret;
     }
-    
+
     //returns null if it's not a drpc group
     private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
         for(Node n: g) {

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java 
b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index ef1399b..a61e3f5 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -18,17 +18,20 @@
 package org.apache.storm.trident.graph;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.jgrapht.DirectedGraph;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.planner.Node;
 import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
 
 
-public class Group {
+public class Group implements ITridentResource {
     public final Set<Node> nodes = new HashSet<>();
     private final DirectedGraph<Node, IndexedEdge> graph;
     private final String id = UUID.randomUUID().toString();
@@ -65,6 +68,25 @@ public class Group {
     }
 
     @Override
+    public Map<String, Number> getResources() {
+        Map<String, Number> ret = new HashMap<>();
+        for(Node n: nodes) {
+            if(n instanceof ITridentResource) {
+                Map<String, Number> res = ((ITridentResource)n).getResources();
+                for(Map.Entry<String, Number> kv : res.entrySet()) {
+                    String key = kv.getKey();
+                    Number val = kv.getValue();
+                    if(ret.containsKey(key)) {
+                        val = new Double(val.doubleValue() + 
ret.get(key).doubleValue());
+                    }
+                    ret.put(key, val);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
     public int hashCode() {
         return id.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
new file mode 100644
index 0000000..72ca27e
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.topology.ResourceDeclarer;
+
+public class DefaultResourceDeclarer implements ResourceDeclarer, 
ITridentResource {
+
+    private Map<String, Number> resources = new HashMap<>();
+    private Map conf = Utils.readStormConfig();
+
+    @Override
+    public DefaultResourceDeclarer setMemoryLoad(Number onHeap) {
+        return setMemoryLoad(onHeap, 
Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+    }
+
+    @Override
+    public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number 
offHeap) {
+        if (onHeap != null) {
+            onHeap = onHeap.doubleValue();
+            
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        }
+        if (offHeap!=null) {
+            offHeap = offHeap.doubleValue();
+            
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+        }
+        return this;
+    }
+
+    @Override
+    public DefaultResourceDeclarer setCPULoad(Number amount) {
+        if(amount != null) {
+            amount = amount.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
+        }
+        return this;
+    }
+
+    @Override
+    public Map<String, Number> getResources() {
+        return new HashMap<String, Number>(resources);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
new file mode 100644
index 0000000..4b8a047
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.Map;
+
+public interface ITridentResource {
+    Map<String, Number> getResources();
+}

Reply via email to