Addressing comments.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3323f30 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3323f30 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3323f30 Branch: refs/heads/master Commit: d3323f305c4835274a219bbe354597856ce6a1ff Parents: fbfb1ca Author: Kyle Nusbaum <[email protected]> Authored: Fri Mar 11 11:22:06 2016 -0600 Committer: Kyle Nusbaum <[email protected]> Committed: Fri Mar 11 11:22:06 2016 -0600 ---------------------------------------------------------------------- storm-core/src/jvm/org/apache/storm/trident/Stream.java | 6 +++--- .../src/jvm/org/apache/storm/trident/TridentTopology.java | 7 +------ .../integration/org/apache/storm/trident/integration_test.clj | 6 +----- 3 files changed, 5 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index e13cb49..b680977 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -124,7 +124,7 @@ public class Stream implements IAggregatableStream { } /** - * Sets the CPU Load resource for the current node + * Sets the CPU Load resource for the current operation */ public Stream setCPULoad(Number load) { _node.setCPULoad(load); @@ -132,7 +132,7 @@ public class Stream implements IAggregatableStream { } /** - * Sets the Memory Load resources for the current node. + * Sets the Memory Load resources for the current operation. * offHeap becomes default */ public Stream setMemoryLoad(Number onHeap) { @@ -141,7 +141,7 @@ public class Stream implements IAggregatableStream { } /** - * Sets the Memory Load resources for the current node + * Sets the Memory Load resources for the current operation. */ public Stream setMemoryLoad(Number onHeap, Number offHeap) { _node.setMemoryLoad(onHeap, offHeap); http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index 3836663..ccf01dd 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -401,12 +401,7 @@ public class TridentTopology { Integer parallelism = parallelisms.get(grouper.nodeGroup(sn)); Map<String, Number> spoutRes = null; - if(sn instanceof ITridentResource) { - spoutRes = mergeDefaultResources(((ITridentResource)sn).getResources(), defaults); - } - else { - spoutRes = mergeDefaultResources(null, defaults); - } + spoutRes = mergeDefaultResources(sn.getResources(), defaults); Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj index 14e6c5b..c205571 100644 --- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj @@ -313,10 +313,6 @@ (setCPULoad 100) (setMemoryLoad 2048))) (with-topology [cluster topo storm-topo] -; (log-message "\n") -; (log-message "Getting json confs from bolts:") -;; (log-message "Bolts: " (. storm-topo get_bolts) "(" (. storm-topo get_bolts_size) ")") -; (doall (map (fn [[k v]] (log-message k ":" (.. v get_common get_json_conf))) (. storm-topo get_bolts))) (let [parse-fn (fn [[k v]] [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))]) @@ -350,7 +346,7 @@ (testing "bolt combinations" (is (= (-> (json-confs "b-1") (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) - 1536.0)) + (+ 1024.0 512.0))) (is (= (-> (json-confs "b-1") (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
