Ready for PR
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fbfb1ca0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fbfb1ca0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fbfb1ca0 Branch: refs/heads/master Commit: fbfb1ca0bb97ac2001d139eab56fef6917680340 Parents: 17a55d2 Author: Kyle Nusbaum <[email protected]> Authored: Wed Mar 9 14:57:30 2016 -0600 Committer: Kyle Nusbaum <[email protected]> Committed: Wed Mar 9 14:57:30 2016 -0600 ---------------------------------------------------------------------- .../clj/org/apache/storm/trident/testing.clj | 12 +- .../jvm/org/apache/storm/trident/Stream.java | 25 ++++ .../org/apache/storm/trident/TridentState.java | 21 +++- .../org/apache/storm/trident/planner/Node.java | 5 +- .../apache/storm/trident/integration_test.clj | 114 ++++++++++++++++--- 5 files changed, 150 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/clj/org/apache/storm/trident/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj index 0ec5613..9ddd94b 100644 --- a/storm-core/src/clj/org/apache/storm/trident/testing.clj +++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj @@ -56,14 +56,14 @@ (.shutdown ~drpc) )) -(defn with-topology* [cluster topo body-fn] - (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo)) +(defn with-topology* [cluster storm-topo body-fn] + (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo) (body-fn) - (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))) - ) + (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))) -(defmacro with-topology [[cluster topo] & body] - `(with-topology* ~cluster ~topo (fn [] ~@body))) +(defmacro with-topology [[cluster topo storm-topo] & body] + `(let [~storm-topo (.build ~topo)] + (with-topology* ~cluster ~storm-topo (fn [] ~@body)))) (defn bootstrap-imports [] (import 'org.apache.storm.LocalDRPC) http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index d313678..e13cb49 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -124,6 +124,31 @@ public class Stream implements IAggregatableStream { } /** + * Sets the CPU Load resource for the current node + */ + public Stream setCPULoad(Number load) { + _node.setCPULoad(load); + return this; + } + + /** + * Sets the Memory Load resources for the current node. + * offHeap becomes default + */ + public Stream setMemoryLoad(Number onHeap) { + _node.setMemoryLoad(onHeap); + return this; + } + + /** + * Sets the Memory Load resources for the current node + */ + public Stream setMemoryLoad(Number onHeap, Number offHeap) { + _node.setMemoryLoad(onHeap, offHeap); + return this; + } + + /** * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`. * * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling" http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/TridentState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java index 7173254..fafd5f9 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java @@ -23,18 +23,33 @@ import org.apache.storm.trident.planner.Node; public class TridentState { TridentTopology _topology; Node _node; - + protected TridentState(TridentTopology topology, Node node) { _topology = topology; _node = node; } - + public Stream newValuesStream() { return new Stream(_topology, _node.name, _node); } - + public TridentState parallelismHint(int parallelism) { _node.parallelismHint = parallelism; return this; } + + public TridentState setCPULoad(Number load) { + _node.setCPULoad(load); + return this; + } + + public TridentState setMemoryLoad(Number onHeap) { + _node.setMemoryLoad(onHeap); + return this; + } + + public TridentState setMemoryLoad(Number onHeap, Number offHeap) { + _node.setMemoryLoad(onHeap, offHeap); + return this; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java index 64d8a3b..e39ec50 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java +++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java @@ -17,6 +17,7 @@ */ package org.apache.storm.trident.planner; +import org.apache.storm.trident.operation.DefaultResourceDeclarer; import org.apache.storm.tuple.Fields; import java.io.Serializable; import java.util.UUID; @@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; -public class Node implements Serializable { +public class Node extends DefaultResourceDeclarer implements Serializable { private static final AtomicInteger INDEX = new AtomicInteger(0); private String nodeId; @@ -62,6 +63,4 @@ public class Node implements Serializable { public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); } - - } http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj index 57edb70..14e6c5b 100644 --- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj @@ -19,9 +19,13 @@ (:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter MemoryMapState$Factory]) (:import [org.apache.storm.trident.state StateSpec]) - (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]) - (:use [org.apache.storm.trident testing])) - + (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater] + [org.apache.storm.trident.operation BaseFunction] + [org.json.simple.parser JSONParser] + [org.apache.storm Config]) + (:use [org.apache.storm.trident testing] + [org.apache.storm log util config])) + (bootstrap-imports) (defmacro letlocals @@ -49,13 +53,13 @@ (.groupBy (fields "word")) (.persistentAggregate (memory-map-state) (Count.) (fields "count")) (.parallelismHint 6) - )) + )) (-> topo (.newDRPCStream "all-tuples" drpc) (.broadcast) (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count")) (.project (fields "word" "count"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]} (into #{} (exec-drpc drpc "all-tuples" "man")))) @@ -84,7 +88,7 @@ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count")) (.aggregate (fields "count") (Sum.) (fields "sum")) (.project (fields "sum"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= [[2]] (exec-drpc drpc "words" "the"))) (is (= [[1]] (exec-drpc drpc "words" "hello"))) @@ -94,7 +98,7 @@ (is (= [[8]] (exec-drpc drpc "words" "man where you the"))) ))))) -;; this test reproduces a bug where committer spouts freeze processing when +;; this test reproduces a bug where committer spouts freeze processing when ;; there's at least one repartitioning after the spout (deftest test-word-count-committer-spout (t/with-local-cluster [cluster] @@ -119,7 +123,7 @@ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count")) (.aggregate (fields "count") (Sum.) (fields "sum")) (.project (fields "sum"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (feed feeder [["hello the man said"] ["the"]]) (is (= [[2]] (exec-drpc drpc "words" "the"))) (is (= [[1]] (exec-drpc drpc "words" "hello"))) @@ -146,13 +150,13 @@ (.aggregate (CountAsAggregator.) (fields "count")) (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly (.project (fields "count"))) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (doseq [i (range 100)] (is (= [[1]] (exec-drpc drpc "numwords" "the")))) (is (= [[0]] (exec-drpc drpc "numwords" ""))) (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8"))) ))))) - + (deftest test-split-merge (t/with-local-cluster [cluster] (with-drpc [drpc] @@ -169,7 +173,7 @@ (.project (fields "len")))) (.merge topo [s1 s2]) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man"))) (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello"))) ))))) @@ -191,11 +195,11 @@ (.aggregate (CountAsAggregator.) (fields "count")))) (.merge topo [s1 s2]) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the"))) (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa"))) ))))) - + (deftest test-multi-repartition (t/with-local-cluster [cluster] (with-drpc [drpc] @@ -207,7 +211,7 @@ (.shuffle) (.aggregate (CountAsAggregator.) (fields "count")) )) - (with-topology [cluster topo] + (with-topology [cluster topo storm-topo] (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man"))) (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa"))) ))))) @@ -281,6 +285,86 @@ (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count")))))) ))) + +(deftest test-set-component-resources + (t/with-local-cluster [cluster] + (with-drpc [drpc] + (letlocals + (bind topo (TridentTopology.)) + (bind feeder (feeder-spout ["sentence"])) + (bind add-bang (proxy [BaseFunction] [] + (execute [tuple collector] + (. collector emit (str (. tuple getString 0) "!"))))) + (bind word-counts + (.. topo + (newStream "words" feeder) + (parallelismHint 5) + (setCPULoad 20) + (setMemoryLoad 512 256) + (each (fields "sentence") (Split.) (fields "word")) + (setCPULoad 10) + (setMemoryLoad 512) + (each (fields "word") add-bang (fields "word!")) + (parallelismHint 10) + (setCPULoad 50) + (setMemoryLoad 1024) + (groupBy (fields "word!")) + (persistentAggregate (memory-map-state) (Count.) (fields "count")) + (setCPULoad 100) + (setMemoryLoad 2048))) + (with-topology [cluster topo storm-topo] +; (log-message "\n") +; (log-message "Getting json confs from bolts:") +;; (log-message "Bolts: " (. storm-topo get_bolts) "(" (. storm-topo get_bolts_size) ")") +; (doall (map (fn [[k v]] (log-message k ":" (.. v get_common get_json_conf))) (. storm-topo get_bolts))) + + (let [parse-fn (fn [[k v]] + [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))]) + json-confs (into {} (map parse-fn (. storm-topo get_bolts)))] + (testing "spout memory" + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 512.0)) + + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB)) + 256.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 512.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB)) + 256.0))) + + (testing "spout CPU" + (is (= (-> (json-confs "spout-words") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 20.0)) + + (is (= (-> (json-confs "$spoutcoord-spout-words") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 20.0))) + + (testing "bolt combinations" + (is (= (-> (json-confs "b-1") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 1536.0)) + + (is (= (-> (json-confs "b-1") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 60.0))) + + (testing "aggregations after partition" + (is (= (-> (json-confs "b-0") + (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB)) + 2048.0)) + + (is (= (-> (json-confs "b-0") + (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT)) + 100.0))))))))) + ;; (deftest test-split-merge ;; (t/with-local-cluster [cluster] ;; (with-drpc [drpc] @@ -295,7 +379,7 @@ ;; (-> drpc-stream ;; (.each (fields "args") (StringLength.) (fields "len")) ;; (.project (fields "len")))) -;; +;; ;; (.merge topo [s1 s2]) ;; (with-topology [cluster topo] ;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
