Repository: storm Updated Branches: refs/heads/master bb60f11c0 -> d187a20e2
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 70cb885..42a0374 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -20,7 +20,9 @@ (:require [org.apache.storm [converter :as converter]]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] - [org.apache.storm.nimbus InMemoryTopologyActionNotifier]) + [org.apache.storm.nimbus InMemoryTopologyActionNotifier] + [org.apache.storm.generated GlobalStreamId] + [org.apache.storm Thrift]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) @@ -38,9 +40,7 @@ (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper]) (:use [org.apache.storm.daemon common]) (:require [conjure.core]) - (:require [org.apache.storm - [thrift :as thrift] - [cluster :as cluster]]) + (:require [org.apache.storm [cluster :as cluster]]) (:use [conjure core])) (defn- from-json @@ -211,16 +211,34 @@ :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (let [state (:storm-cluster-state cluster) nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 4) - "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}) - topology2 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 12)} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 6) - "3" (thrift/mk-bolt-spec {"1" :global} (TestPlannerBolt.) :parallelism-hint 8) - "4" (thrift/mk-bolt-spec {"1" :global "2" :none} (TestPlannerBolt.) :parallelism-hint 4)} - ) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. false) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 4)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.))}) + topology2 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 12))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 6)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} + (TestPlannerBolt.) (Integer. 8)) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping) + (Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 4))}) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) _ (advance-cluster-time cluster 11) task-info (storm-component->task-info cluster "mystorm")] @@ -278,10 +296,17 @@ topology-name "test-auto-cred-storm" submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE) - (.set_creds submitOptions (Credentials. (HashMap.))) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 4) - "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. false) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 4)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.))}) _ (submit-local-topology-with-opts nimbus topology-name {TOPOLOGY-WORKERS 4 TOPOLOGY-AUTO-CREDENTIALS (list "org.apache.storm.MockAutoCred") } topology submitOptions) @@ -320,10 +345,17 @@ (letlocals (bind state (:storm-cluster-state cluster)) (bind nimbus (:nimbus cluster)) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 5) - "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))})) + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. false) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 5)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.))})) (submit-local-topology nimbus "noniso" {TOPOLOGY-WORKERS 4} topology) (advance-cluster-time cluster 11) @@ -365,10 +397,20 @@ (with-simulated-time-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (let [state (:storm-cluster-state cluster) nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3 :conf {TOPOLOGY-TASKS 0})} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 2}) - "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :conf {TOPOLOGY-TASKS 5})}) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. false) (Integer. 3) + {TOPOLOGY-TASKS 0})} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 1) + {TOPOLOGY-TASKS 2}) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) nil + {TOPOLOGY-TASKS 5})}) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) _ (advance-cluster-time cluster 11) task-info (storm-component->task-info cluster "mystorm")] @@ -383,10 +425,19 @@ (deftest test-executor-assignments (with-simulated-time-local-cluster[cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (let [nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3 :conf {TOPOLOGY-TASKS 5})} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 8 :conf {TOPOLOGY-TASKS 2}) - "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :parallelism-hint 3)}) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3) + {TOPOLOGY-TASKS 5})} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 8) + {TOPOLOGY-TASKS 2}) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 3))}) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) _ (advance-cluster-time cluster 11) task-info (storm-component->task-info cluster "mystorm") @@ -408,12 +459,21 @@ :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (let [state (:storm-cluster-state cluster) nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 21)} - {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 9) - "3" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 2) - "4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 10)} - ) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 21))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 9)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 2)) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareNoneGrouping)} + (TestPlannerBolt.) (Integer. 10))}) _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology) _ (advance-cluster-time cluster 11) task-info (storm-component->task-info cluster "test")] @@ -436,10 +496,10 @@ (stubbing [nimbus/user-groups ["alice-group"]] (letlocals (bind conf (:daemon-conf cluster)) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)} - {} - )) + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 4))} + {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology) (bind storm-id (get-storm-id state "test")) @@ -532,10 +592,10 @@ TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals (bind conf (:daemon-conf cluster)) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 14)} - {} - )) + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 14))} + {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology) (bind storm-id (get-storm-id state "test")) @@ -626,10 +686,10 @@ TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals (bind conf (:daemon-conf cluster)) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} - {} - )) + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 2))} + {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology) (advance-cluster-time cluster 11) @@ -747,10 +807,10 @@ (add-supervisor cluster :ports 1 :id "a") (add-supervisor cluster :ports 1 :id "b") (bind conf (:daemon-conf cluster)) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} - {} - )) + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 2))} + {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology) (advance-cluster-time cluster 11) @@ -803,8 +863,9 @@ TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 9)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 9))} {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally @@ -852,8 +913,9 @@ TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) @@ -898,10 +960,10 @@ TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) - :parallelism-hint 6 - :conf {TOPOLOGY-TASKS 12})} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 6) + {TOPOLOGY-TASKS 12})} {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) @@ -976,14 +1038,17 @@ TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) - (bind topology2 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology2 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) - (bind topology3 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology3 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (bind state (:storm-cluster-state cluster)) (submit-local-topology (:nimbus cluster) @@ -1023,18 +1088,20 @@ NIMBUS-EXECUTORS-PER-TOPOLOGY 8 NIMBUS-SLOTS-PER-TOPOLOGY 8}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 1})} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 1) + {TOPOLOGY-TASKS 1})} {})) (is (thrown? InvalidTopologyException (submit-local-topology (:nimbus cluster) "test/aaa" {} topology))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) - :parallelism-hint 16 - :conf {TOPOLOGY-TASKS 16})} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 16) + {TOPOLOGY-TASKS 16})} {})) (bind state (:storm-cluster-state cluster)) (is (thrown? InvalidTopologyException @@ -1042,10 +1109,10 @@ "test" {TOPOLOGY-WORKERS 3} topology))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) - :parallelism-hint 5 - :conf {TOPOLOGY-TASKS 5})} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 5) + {TOPOLOGY-TASKS 5})} {})) (is (thrown? InvalidTopologyException (submit-local-topology (:nimbus cluster) @@ -1080,8 +1147,9 @@ STORM-LOCAL-DIR nimbus-dir})) (bind cluster-state (cluster/mk-storm-cluster-state conf)) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (submit-local-topology nimbus "t1" {} topology) (submit-local-topology nimbus "t2" {} topology) @@ -1152,8 +1220,9 @@ STORM-LOCAL-DIR nimbus-dir})) (bind cluster-state (cluster/mk-storm-cluster-state conf)) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] @@ -1204,7 +1273,7 @@ "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}] (let [ nimbus (:nimbus cluster) - topology (thrift/mk-topology {} {}) + topology (Thrift/buildTopology {} {}) ] (is (thrown? AuthorizationException (submit-local-topology-with-opts nimbus "mystorm" {} topology @@ -1220,7 +1289,7 @@ "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}] (let [ nimbus (:nimbus cluster) - topology (thrift/mk-topology {} {}) + topology (Thrift/buildTopology {} {}) ] ; Fake good authorization as part of setup. (mocking [nimbus/check-authorization!] @@ -1247,7 +1316,7 @@ :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] (let [nimbus (:nimbus cluster) topology-name "test-nimbus-check-autho-params" - topology (thrift/mk-topology {} {})] + topology (Thrift/buildTopology {} {})] (submit-local-topology-with-opts nimbus topology-name {} topology (SubmitOptions. TopologyInitialStatus/INACTIVE)) @@ -1432,7 +1501,7 @@ (deftest test-validate-topo-config-on-submit (with-local-cluster [cluster] (let [nimbus (:nimbus cluster) - topology (thrift/mk-topology {} {}) + topology (Thrift/buildTopology {} {}) bad-config {"topology.isolate.machines" "2"}] ; Fake good authorization as part of setup. (mocking [nimbus/check-authorization!] @@ -1453,8 +1522,9 @@ (bind cluster-state (cluster/mk-storm-cluster-state conf)) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) (Time/sleepSecs 1) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology) ; make transition for topology t1 to be killed -> nimbus applies this event to cluster state @@ -1486,8 +1556,9 @@ (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) (bind notifier (InMemoryTopologyActionNotifier.)) (Time/sleepSecs 1) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (submit-local-topology nimbus "test-notification" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology) @@ -1512,8 +1583,9 @@ (deftest test-debug-on-component (with-local-cluster [cluster] (let [nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + topology (Thrift/buildTopology + {"spout" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})] (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology) (.debug nimbus "t1" "spout" true 100)))) @@ -1521,8 +1593,9 @@ (deftest test-debug-on-global (with-local-cluster [cluster] (let [nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + topology (Thrift/buildTopology + {"spout" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})] (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology) (.debug nimbus "t1" "" true 100)))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj index f613a5b..4ca0721 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj @@ -15,7 +15,8 @@ ;; limitations under the License. (ns org.apache.storm.scheduler.resource-aware-scheduler-test (:use [clojure test]) - (:use [org.apache.storm util config testing thrift]) + (:use [org.apache.storm util config testing]) + (:use [org.apache.storm.internal thrift]) (:require [org.apache.storm.util :refer [map-val]]) (:require [org.apache.storm.daemon [nimbus :as nimbus]]) (:import [org.apache.storm.generated StormTopology] http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj index 9c31ddf..345cb24 100644 --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@ -31,10 +31,12 @@ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller]) (:import [java.nio.file.attribute FileAttribute]) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils]) (:use [org.apache.storm config testing util timer log]) (:use [org.apache.storm.daemon common]) (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]] - [org.apache.storm [thrift :as thrift] [cluster :as cluster]]) + [org.apache.storm [cluster :as cluster]]) (:use [conjure core]) (:require [clojure.java.io :as io])) @@ -103,8 +105,9 @@ SUPERVISOR-WORKER-TIMEOUT-SECS 15 SUPERVISOR-MONITOR-FREQUENCY-SECS 3}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 4))} {})) (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) (bind changed (capture-changed-workers @@ -156,11 +159,13 @@ SUPERVISOR-WORKER-TIMEOUT-SECS 15 SUPERVISOR-MONITOR-FREQUENCY-SECS 3}] (letlocals - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 4))} {})) - (bind topology2 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology2 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2])) @@ -270,8 +275,9 @@ (check-heartbeat cluster "sup" 3) (advance-cluster-time cluster 15) (check-heartbeat cluster "sup" 3) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 4))} {})) ;; prevent them from launching by capturing them (capture-changed-workers @@ -646,7 +652,7 @@ (supervisor/supervisor-data auth-conf nil fake-isupervisor) (verify-call-times-for cluster/mk-storm-cluster-state 1) (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2] - expected-acls))))) + expected-acls)))))) (deftest test-write-log-metadata (testing "supervisor writes correct data to logs metadata file" @@ -769,66 +775,68 @@ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)] (is (= expected-childopts childopts-with-ids))))) - (deftest test-retry-read-assignments - (with-simulated-time-local-cluster [cluster - :supervisors 0 - :ports-per-supervisor 2 - :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true - NIMBUS-MONITOR-FREQ-SECS 10 - TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 - TOPOLOGY-ACKER-EXECUTORS 0}] - (letlocals - (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) - (bind topology1 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} - {})) - (bind topology2 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} - {})) - (bind state (:storm-cluster-state cluster)) - (bind changed (capture-changed-workers - (submit-mocked-assignment - (:nimbus cluster) - (:storm-cluster-state cluster) - "topology1" - {TOPOLOGY-WORKERS 2} - topology1 - {1 "1" - 2 "1"} - {[1 1] ["sup1" 1] - [2 2] ["sup1" 2]} - {["sup1" 1] [0.0 0.0 0.0] - ["sup1" 2] [0.0 0.0 0.0] - }) - (submit-mocked-assignment - (:nimbus cluster) - (:storm-cluster-state cluster) - "topology2" - {TOPOLOGY-WORKERS 2} - topology2 - {1 "1" - 2 "1"} - {[1 1] ["sup1" 1] - [2 2] ["sup1" 2]} - {["sup1" 1] [0.0 0.0 0.0] - ["sup1" 2] [0.0 0.0 0.0] - }) - ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. - (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0))) - )) - (is (empty? (:launched changed))) - (bind options (RebalanceOptions.)) - (.set_wait_secs options 0) - (bind changed (capture-changed-workers - (.rebalance (:nimbus cluster) "topology2" options) - (advance-cluster-time cluster 10) - (heartbeat-workers cluster "sup1" [1 2 3 4]) - (advance-cluster-time cluster 10) - )) - (validate-launched-once (:launched changed) - {"sup1" [1 2]} - (get-storm-id (:storm-cluster-state cluster) "topology1")) - (validate-launched-once (:launched changed) - {"sup1" [3 4]} - (get-storm-id (:storm-cluster-state cluster) "topology2")) - )))) +(deftest test-retry-read-assignments + (with-simulated-time-local-cluster [cluster + :supervisors 0 + :ports-per-supervisor 2 + :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true + NIMBUS-MONITOR-FREQ-SECS 10 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 + TOPOLOGY-ACKER-EXECUTORS 0}] + (letlocals + (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) + (bind topology1 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 2))} + {})) + (bind topology2 (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 2))} + {})) + (bind state (:storm-cluster-state cluster)) + (bind changed (capture-changed-workers + (submit-mocked-assignment + (:nimbus cluster) + (:storm-cluster-state cluster) + "topology1" + {TOPOLOGY-WORKERS 2} + topology1 + {1 "1" + 2 "1"} + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2]} + {["sup1" 1] [0.0 0.0 0.0] + ["sup1" 2] [0.0 0.0 0.0] + }) + (submit-mocked-assignment + (:nimbus cluster) + (:storm-cluster-state cluster) + "topology2" + {TOPOLOGY-WORKERS 2} + topology2 + {1 "1" + 2 "1"} + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2]} + {["sup1" 1] [0.0 0.0 0.0] + ["sup1" 2] [0.0 0.0 0.0] + }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0))) + )) + (is (empty? (:launched changed))) + (bind options (RebalanceOptions.)) + (.set_wait_secs options 0) + (bind changed (capture-changed-workers + (.rebalance (:nimbus cluster) "topology2" options) + (advance-cluster-time cluster 10) + (heartbeat-workers cluster "sup1" [1 2 3 4]) + (advance-cluster-time cluster 10) + )) + (validate-launched-once (:launched changed) + {"sup1" [1 2]} + (get-storm-id (:storm-cluster-state cluster) "topology1")) + (validate-launched-once (:launched changed) + {"sup1" [3 4]} + (get-storm-id (:storm-cluster-state cluster) "topology2")) + ))) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj b/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj index 543db09..99ad957 100644 --- a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj +++ b/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj @@ -15,9 +15,11 @@ ;; limitations under the License. (ns org.apache.storm.tick-tuple-test (:use [clojure test]) - (:use [org.apache.storm testing clojure config]) + (:use [org.apache.storm testing config]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (defbolt noop-bolt ["tuple"] {:prepare true} [conf context collector] @@ -31,9 +33,12 @@ (deftest test-tick-tuple-works-with-system-bolt (with-simulated-time-local-cluster [cluster] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec noop-spout)} - {"2" (thrift/mk-bolt-spec {"1" ["tuple"]} noop-bolt)})] + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails noop-spout)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["tuple"])} + noop-bolt)})] (try (submit-local-topology (:nimbus cluster) "test" http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/transactional_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/transactional_test.clj b/storm-core/test/clj/org/apache/storm/transactional_test.clj index dd46a7d..b8af518 100644 --- a/storm-core/test/clj/org/apache/storm/transactional_test.clj +++ b/storm-core/test/clj/org/apache/storm/transactional_test.clj @@ -36,7 +36,8 @@ (:import [org.mockito Matchers Mockito]) (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [java.util HashMap Collections ArrayList]) - (:use [org.apache.storm testing util config clojure]) + (:use [org.apache.storm testing util config]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common])) ;; Testing TODO:
