Ready for PR

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fbfb1ca0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fbfb1ca0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fbfb1ca0

Branch: refs/heads/master
Commit: fbfb1ca0bb97ac2001d139eab56fef6917680340
Parents: 17a55d2
Author: Kyle Nusbaum <[email protected]>
Authored: Wed Mar 9 14:57:30 2016 -0600
Committer: Kyle Nusbaum <[email protected]>
Committed: Wed Mar 9 14:57:30 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/trident/testing.clj    |  12 +-
 .../jvm/org/apache/storm/trident/Stream.java    |  25 ++++
 .../org/apache/storm/trident/TridentState.java  |  21 +++-
 .../org/apache/storm/trident/planner/Node.java  |   5 +-
 .../apache/storm/trident/integration_test.clj   | 114 ++++++++++++++++---
 5 files changed, 150 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj 
b/storm-core/src/clj/org/apache/storm/trident/testing.clj
index 0ec5613..9ddd94b 100644
--- a/storm-core/src/clj/org/apache/storm/trident/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -56,14 +56,14 @@
      (.shutdown ~drpc)
      ))
 
-(defn with-topology* [cluster topo body-fn]
-  (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+(defn with-topology* [cluster storm-topo body-fn]
+  (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo)
   (body-fn)
-  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) 
(.set_wait_secs 0)))
-  )
+  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) 
(.set_wait_secs 0))))
 
-(defmacro with-topology [[cluster topo] & body]
-  `(with-topology* ~cluster ~topo (fn [] ~@body)))
+(defmacro with-topology [[cluster topo storm-topo] & body]
+  `(let [~storm-topo (.build ~topo)]
+     (with-topology* ~cluster ~storm-topo (fn [] ~@body))))
 
 (defn bootstrap-imports []
   (import 'org.apache.storm.LocalDRPC)

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java 
b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index d313678..e13cb49 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -124,6 +124,31 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
+     * Sets the CPU Load resource for the current node
+     */
+    public Stream setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current node.
+     * offHeap becomes default
+     */
+    public Stream setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current node
+     */
+    public Stream setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
+
+    /**
      * Filters out fields from a stream, resulting in a Stream containing only 
the fields specified by `keepFields`.
      *
      * For example, if you had a Stream `mystream` containing the fields 
`["a", "b", "c","d"]`, calling"

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java 
b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index 7173254..fafd5f9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -23,18 +23,33 @@ import org.apache.storm.trident.planner.Node;
 public class TridentState {
     TridentTopology _topology;
     Node _node;
-    
+
     protected TridentState(TridentTopology topology, Node node) {
         _topology = topology;
         _node = node;
     }
-    
+
     public Stream newValuesStream() {
         return new Stream(_topology, _node.name, _node);
     }
-    
+
     public TridentState parallelismHint(int parallelism) {
         _node.parallelismHint = parallelism;
         return this;
     }
+
+    public TridentState setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    public TridentState setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java 
b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index 64d8a3b..e39ec50 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.planner;
 
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
 import org.apache.storm.tuple.Fields;
 import java.io.Serializable;
 import java.util.UUID;
@@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 
-public class Node implements Serializable {
+public class Node extends DefaultResourceDeclarer implements Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;
@@ -62,6 +63,4 @@ public class Node implements Serializable {
     public String toString() {
         return ToStringBuilder.reflectionToString(this, 
ToStringStyle.MULTI_LINE_STYLE);
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 57edb70..14e6c5b 100644
--- 
a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ 
b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -19,9 +19,13 @@
   (:import [org.apache.storm.trident.testing Split CountAsAggregator 
StringLength TrueFilter
             MemoryMapState$Factory])
   (:import [org.apache.storm.trident.state StateSpec])
-  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater])
-  (:use [org.apache.storm.trident testing]))
-  
+  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+           [org.apache.storm.trident.operation BaseFunction]
+           [org.json.simple.parser JSONParser]
+           [org.apache.storm Config])
+  (:use [org.apache.storm.trident testing]
+        [org.apache.storm log util config]))
+
 (bootstrap-imports)
 
 (defmacro letlocals
@@ -49,13 +53,13 @@
               (.groupBy (fields "word"))
               (.persistentAggregate (memory-map-state) (Count.) (fields 
"count"))
               (.parallelismHint 6)
-              ))       
+              ))
         (-> topo
             (.newDRPCStream "all-tuples" drpc)
             (.broadcast)
             (.stateQuery word-counts (fields "args") (TupleCollectionGet.) 
(fields "word" "count"))
             (.project (fields "word" "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
                  (into #{} (exec-drpc drpc "all-tuples" "man"))))
@@ -84,7 +88,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields 
"count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -94,7 +98,7 @@
           (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
           )))))
 
-;; this test reproduces a bug where committer spouts freeze processing when 
+;; this test reproduces a bug where committer spouts freeze processing when
 ;; there's at least one repartitioning after the spout
 (deftest test-word-count-committer-spout
   (t/with-local-cluster [cluster]
@@ -119,7 +123,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields 
"count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -146,13 +150,13 @@
             (.aggregate (CountAsAggregator.) (fields "count"))
             (.parallelismHint 2) ;;this makes sure batchGlobal is working 
correctly
             (.project (fields "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (doseq [i (range 100)]
             (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
           (is (= [[0]] (exec-drpc drpc "numwords" "")))
           (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
           )))))
-          
+
 (deftest test-split-merge
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -169,7 +173,7 @@
               (.project (fields "len"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the 
man")))
           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
           )))))
@@ -191,11 +195,11 @@
               (.aggregate (CountAsAggregator.) (fields "count"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
           (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" 
"aaaaa")))
           )))))
-          
+
 (deftest test-multi-repartition
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -207,7 +211,7 @@
                                    (.shuffle)
                                    (.aggregate (CountAsAggregator.) (fields 
"count"))
                                    ))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
           (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
           )))))
@@ -281,6 +285,86 @@
                         (.stateQuery word-counts (fields "word1") (MapGet.) 
(fields "count"))))))
      )))
 
+
+(deftest test-set-component-resources
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind add-bang (proxy [BaseFunction] []
+                         (execute [tuple collector]
+                           (. collector emit (str (. tuple getString 0) 
"!")))))
+        (bind word-counts
+          (.. topo
+              (newStream "words" feeder)
+              (parallelismHint 5)
+              (setCPULoad 20)
+              (setMemoryLoad 512 256)
+              (each (fields "sentence") (Split.) (fields "word"))
+              (setCPULoad 10)
+              (setMemoryLoad 512)
+              (each (fields "word") add-bang (fields "word!"))
+              (parallelismHint 10)
+              (setCPULoad 50)
+              (setMemoryLoad 1024)
+              (groupBy (fields "word!"))
+              (persistentAggregate (memory-map-state) (Count.) (fields 
"count"))
+              (setCPULoad 100)
+              (setMemoryLoad 2048)))
+        (with-topology [cluster topo storm-topo]
+;          (log-message "\n")
+;          (log-message "Getting json confs from bolts:")
+;;          (log-message "Bolts: " (. storm-topo get_bolts) "(" (. storm-topo 
get_bolts_size) ")")
+;          (doall (map (fn [[k v]] (log-message k ":" (.. v get_common 
get_json_conf))) (. storm-topo get_bolts)))
+
+          (let [parse-fn (fn [[k v]]
+                           [k (clojurify-structure (. (JSONParser.) parse (.. 
v get_common get_json_conf)))])
+                json-confs (into {} (map parse-fn (. storm-topo get_bolts)))]
+            (testing "spout memory"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                   256.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                     256.0)))
+
+            (testing "spout CPU"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                       (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0)))
+
+            (testing "bolt combinations"
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     1536.0))
+
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     60.0)))
+
+            (testing "aggregations after partition"
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     2048.0))
+
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     100.0)))))))))
+
 ;; (deftest test-split-merge
 ;;   (t/with-local-cluster [cluster]
 ;;     (with-drpc [drpc]
@@ -295,7 +379,7 @@
 ;;           (-> drpc-stream
 ;;               (.each (fields "args") (StringLength.) (fields "len"))
 ;;               (.project (fields "len"))))
-;; 
+;;
 ;;         (.merge topo [s1 s2])
 ;;         (with-topology [cluster topo]
 ;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the 
man")))

Reply via email to