Addressing Comments.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/178dd546
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/178dd546
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/178dd546

Branch: refs/heads/master
Commit: 178dd5464790cb8a01c09b43f04ce7dfbc61a332
Parents: d3323f3
Author: Kyle Nusbaum <[email protected]>
Authored: Wed Mar 16 15:33:03 2016 -0500
Committer: Kyle Nusbaum <[email protected]>
Committed: Wed Mar 16 15:33:03 2016 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/trident/Stream.java    |  6 +++++-
 .../jvm/org/apache/storm/trident/TridentState.java  |  6 +++++-
 .../org/apache/storm/trident/TridentTopology.java   |  4 ++--
 .../trident/operation/DefaultResourceDeclarer.java  | 16 ++++++++++------
 .../jvm/org/apache/storm/trident/planner/Node.java  |  2 +-
 5 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java 
b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index b680977..4a51b56 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -20,6 +20,7 @@ package org.apache.storm.trident;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
@@ -90,7 +91,7 @@ import java.util.Comparator;
  *
  */
 // TODO: need to be able to replace existing fields with the function fields 
(like Cascading Fields.REPLACE)
-public class Stream implements IAggregatableStream {
+public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     Node _node;
     TridentTopology _topology;
     String _name;
@@ -126,6 +127,7 @@ public class Stream implements IAggregatableStream {
     /**
      * Sets the CPU Load resource for the current operation
      */
+    @Override
     public Stream setCPULoad(Number load) {
         _node.setCPULoad(load);
         return this;
@@ -135,6 +137,7 @@ public class Stream implements IAggregatableStream {
      * Sets the Memory Load resources for the current operation.
      * offHeap becomes default
      */
+    @Override
     public Stream setMemoryLoad(Number onHeap) {
         _node.setMemoryLoad(onHeap);
         return this;
@@ -143,6 +146,7 @@ public class Stream implements IAggregatableStream {
     /**
      * Sets the Memory Load resources for the current operation.
      */
+    @Override
     public Stream setMemoryLoad(Number onHeap, Number offHeap) {
         _node.setMemoryLoad(onHeap, offHeap);
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java 
b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index fafd5f9..18b60e0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -17,10 +17,11 @@
  */
 package org.apache.storm.trident;
 
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.planner.Node;
 
 
-public class TridentState {
+public class TridentState implements ResourceDeclarer<TridentState> {
     TridentTopology _topology;
     Node _node;
 
@@ -38,16 +39,19 @@ public class TridentState {
         return this;
     }
 
+    @Override
     public TridentState setCPULoad(Number load) {
         _node.setCPULoad(load);
         return this;
     }
 
+    @Override
     public TridentState setMemoryLoad(Number onHeap) {
         _node.setMemoryLoad(onHeap);
         return this;
     }
 
+    @Override
     public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
         _node.setMemoryLoad(onHeap, offHeap);
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java 
b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index ccf01dd..6a4e92f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -458,7 +458,7 @@ public class TridentTopology {
     private static Map<String, Number> mergeDefaultResources(Map<String, 
Number> res, Map defaultConfig) {
         Map<String, Number> ret = new HashMap<String, Number>();
 
-        Number  onHeapDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number onHeapDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
         Number offHeapDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         Number cpuLoadDefault = 
(Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
@@ -469,7 +469,7 @@ public class TridentTopology {
             return ret;
         }
 
-        Number  onHeap = 
res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number onHeap = 
res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
         Number offHeap = 
res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
index 72ca27e..d49011a 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -23,18 +23,22 @@ import org.apache.storm.Config;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.topology.ResourceDeclarer;
 
-public class DefaultResourceDeclarer implements ResourceDeclarer, 
ITridentResource {
+/**
+ * @param T Must always be the type of the extending class. i.e.
+ * public class SubResourceDeclarer extends 
DefaultResourceDeclarer<SubResourceDeclarer> {...}
+ */
+public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> 
implements ResourceDeclarer<T>, ITridentResource {
 
     private Map<String, Number> resources = new HashMap<>();
     private Map conf = Utils.readStormConfig();
 
     @Override
-    public DefaultResourceDeclarer setMemoryLoad(Number onHeap) {
+    public T setMemoryLoad(Number onHeap) {
         return setMemoryLoad(onHeap, 
Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
     }
 
     @Override
-    public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number 
offHeap) {
+    public T setMemoryLoad(Number onHeap, Number offHeap) {
         if (onHeap != null) {
             onHeap = onHeap.doubleValue();
             
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
@@ -43,16 +47,16 @@ public class DefaultResourceDeclarer implements 
ResourceDeclarer, ITridentResour
             offHeap = offHeap.doubleValue();
             
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
         }
-        return this;
+        return (T)this;
     }
 
     @Override
-    public DefaultResourceDeclarer setCPULoad(Number amount) {
+    public T setCPULoad(Number amount) {
         if(amount != null) {
             amount = amount.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
         }
-        return this;
+        return (T)this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java 
b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index e39ec50..b2466e6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 
-public class Node extends DefaultResourceDeclarer implements Serializable {
+public class Node extends DefaultResourceDeclarer<Node> implements 
Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;

Reply via email to