Addressing Comments.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/178dd546 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/178dd546 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/178dd546 Branch: refs/heads/master Commit: 178dd5464790cb8a01c09b43f04ce7dfbc61a332 Parents: d3323f3 Author: Kyle Nusbaum <[email protected]> Authored: Wed Mar 16 15:33:03 2016 -0500 Committer: Kyle Nusbaum <[email protected]> Committed: Wed Mar 16 15:33:03 2016 -0500 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/trident/Stream.java | 6 +++++- .../jvm/org/apache/storm/trident/TridentState.java | 6 +++++- .../org/apache/storm/trident/TridentTopology.java | 4 ++-- .../trident/operation/DefaultResourceDeclarer.java | 16 ++++++++++------ .../jvm/org/apache/storm/trident/planner/Node.java | 2 +- 5 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index b680977..4a51b56 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -20,6 +20,7 @@ package org.apache.storm.trident; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.NullStruct; import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.topology.ResourceDeclarer; import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer; import org.apache.storm.trident.fluent.GlobalAggregationScheme; import org.apache.storm.trident.fluent.GroupedStream; @@ -90,7 +91,7 @@ import java.util.Comparator; * */ // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE) -public class Stream implements IAggregatableStream { +public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { Node _node; TridentTopology _topology; String _name; @@ -126,6 +127,7 @@ public class Stream implements IAggregatableStream { /** * Sets the CPU Load resource for the current operation */ + @Override public Stream setCPULoad(Number load) { _node.setCPULoad(load); return this; @@ -135,6 +137,7 @@ public class Stream implements IAggregatableStream { * Sets the Memory Load resources for the current operation. * offHeap becomes default */ + @Override public Stream setMemoryLoad(Number onHeap) { _node.setMemoryLoad(onHeap); return this; @@ -143,6 +146,7 @@ public class Stream implements IAggregatableStream { /** * Sets the Memory Load resources for the current operation. */ + @Override public Stream setMemoryLoad(Number onHeap, Number offHeap) { _node.setMemoryLoad(onHeap, offHeap); return this; http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java index fafd5f9..18b60e0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java @@ -17,10 +17,11 @@ */ package org.apache.storm.trident; +import org.apache.storm.topology.ResourceDeclarer; import org.apache.storm.trident.planner.Node; -public class TridentState { +public class TridentState implements ResourceDeclarer<TridentState> { TridentTopology _topology; Node _node; @@ -38,16 +39,19 @@ public class TridentState { return this; } + @Override public TridentState setCPULoad(Number load) { _node.setCPULoad(load); return this; } + @Override public TridentState setMemoryLoad(Number onHeap) { _node.setMemoryLoad(onHeap); return this; } + @Override public TridentState setMemoryLoad(Number onHeap, Number offHeap) { _node.setMemoryLoad(onHeap, offHeap); return this; http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index ccf01dd..6a4e92f 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -458,7 +458,7 @@ public class TridentTopology { private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) { Map<String, Number> ret = new HashMap<String, Number>(); - Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); @@ -469,7 +469,7 @@ public class TridentTopology { return ret; } - Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java index 72ca27e..d49011a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java @@ -23,18 +23,22 @@ import org.apache.storm.Config; import org.apache.storm.utils.Utils; import org.apache.storm.topology.ResourceDeclarer; -public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource { +/** + * @param T Must always be the type of the extending class. i.e. + * public class SubResourceDeclarer extends DefaultResourceDeclarer<SubResourceDeclarer> {...} + */ +public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource { private Map<String, Number> resources = new HashMap<>(); private Map conf = Utils.readStormConfig(); @Override - public DefaultResourceDeclarer setMemoryLoad(Number onHeap) { + public T setMemoryLoad(Number onHeap) { return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))); } @Override - public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + public T setMemoryLoad(Number onHeap, Number offHeap) { if (onHeap != null) { onHeap = onHeap.doubleValue(); resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); @@ -43,16 +47,16 @@ public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResour offHeap = offHeap.doubleValue(); resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); } - return this; + return (T)this; } @Override - public DefaultResourceDeclarer setCPULoad(Number amount) { + public T setCPULoad(Number amount) { if(amount != null) { amount = amount.doubleValue(); resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); } - return this; + return (T)this; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java index e39ec50..b2466e6 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java +++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java @@ -26,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; -public class Node extends DefaultResourceDeclarer implements Serializable { +public class Node extends DefaultResourceDeclarer<Node> implements Serializable { private static final AtomicInteger INDEX = new AtomicInteger(0); private String nodeId;
