Addressing comments and adding a bit more documentation.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/251bc569 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/251bc569 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/251bc569 Branch: refs/heads/master Commit: 251bc5691eb83506ba5934724311883486a1c9d2 Parents: 5404023 Author: Kyle Nusbaum <[email protected]> Authored: Thu Mar 17 13:28:19 2016 -0500 Committer: Kyle Nusbaum <[email protected]> Committed: Thu Mar 17 13:28:19 2016 -0500 ---------------------------------------------------------------------- .../org/apache/storm/topology/ResourceDeclarer.java | 4 ++++ .../org/apache/storm/trident/TridentTopology.java | 2 +- .../jvm/org/apache/storm/trident/graph/Group.java | 16 +++++++--------- .../storm/trident/operation/ITridentResource.java | 8 ++++++++ 4 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java index de530b3..4f648eb 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java +++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java @@ -17,6 +17,10 @@ */ package org.apache.storm.topology; +/** + * This is a new base interface that can be used by anything that wants to mirror + * RAS's basic API. Trident uses this to allow setting resources in the Stream API. + */ public interface ResourceDeclarer <T extends ResourceDeclarer> { T setMemoryLoad(Number onHeap); T setMemoryLoad(Number onHeap, Number offHeap); http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index 3aefdc5..e0a349b 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -484,7 +484,7 @@ public class TridentTopology { Right now, this code does not check that. It just takes the max of the summed up resource counts for simplicity's sake. We could perform some more complicated logic to be more accurate, but the benefits are very small, and only apply to some - very odd corner cases. */g + very odd corner cases. */ if(onHeap == null) { onHeap = onHeapDefault; } http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java index a61e3f5..2c92304 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java +++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java @@ -71,16 +71,14 @@ public class Group implements ITridentResource { public Map<String, Number> getResources() { Map<String, Number> ret = new HashMap<>(); for(Node n: nodes) { - if(n instanceof ITridentResource) { - Map<String, Number> res = ((ITridentResource)n).getResources(); - for(Map.Entry<String, Number> kv : res.entrySet()) { - String key = kv.getKey(); - Number val = kv.getValue(); - if(ret.containsKey(key)) { - val = new Double(val.doubleValue() + ret.get(key).doubleValue()); - } - ret.put(key, val); + Map<String, Number> res = n.getResources(); + for(Map.Entry<String, Number> kv : res.entrySet()) { + String key = kv.getKey(); + Number val = kv.getValue(); + if(ret.containsKey(key)) { + val = new Double(val.doubleValue() + ret.get(key).doubleValue()); } + ret.put(key, val); } } return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java index 4b8a047..b3e10ef 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java @@ -19,6 +19,14 @@ package org.apache.storm.trident.operation; import java.util.Map; +/** + * This interface is implemented by various Trident classes in order to + * gather and propogate resources that have been set on them. + * @see ResourceDeclarer + */ public interface ITridentResource { + /** + * @return a name of resource name -> amount of that resource. *Return should never be null!* + */ Map<String, Number> getResources(); }
