Repository: storm
Updated Branches:
  refs/heads/master bb60f11c0 -> d187a20e2


http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 70cb885..42a0374 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -20,7 +20,9 @@
   (:require [org.apache.storm [converter :as converter]])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
             TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
-           [org.apache.storm.nimbus InMemoryTopologyActionNotifier])
+           [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
+           [org.apache.storm.generated GlobalStreamId]
+           [org.apache.storm Thrift])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.scheduler INimbus])
   (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
@@ -38,9 +40,7 @@
   (:use [org.apache.storm testing MockAutoCred util config log timer 
zookeeper])
   (:use [org.apache.storm.daemon common])
   (:require [conjure.core])
-  (:require [org.apache.storm
-             [thrift :as thrift]
-             [cluster :as cluster]])
+  (:require [org.apache.storm [cluster :as cluster]])
   (:use [conjure core]))
 
 (defn- from-json
@@ -211,16 +211,34 @@
                        :daemon-conf {SUPERVISOR-ENABLE false 
TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (let [state (:storm-cluster-state cluster)
           nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) 
:parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 4)
-                     "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))})
-          topology2 (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 12)}
-                     {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 6)
-                      "3" (thrift/mk-bolt-spec {"1" :global} 
(TestPlannerBolt.) :parallelism-hint 8)
-                      "4" (thrift/mk-bolt-spec {"1" :global "2" :none} 
(TestPlannerBolt.) :parallelism-hint 4)}
-                     )
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. false) (Integer. 3))}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.) (Integer. 4))
+                      "3" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "2" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.))})
+          topology2 (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. true) (Integer. 12))}
+                      {"2" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareNoneGrouping)}
+                             (TestPlannerBolt.) (Integer. 6))
+                       "3" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareGlobalGrouping)}
+                             (TestPlannerBolt.) (Integer. 8))
+                       "4" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareGlobalGrouping)
+                              (Utils/getGlobalStreamId "2" nil)
+                              (Thrift/prepareNoneGrouping)}
+                             (TestPlannerBolt.) (Integer. 4))})
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
           _ (advance-cluster-time cluster 11)
           task-info (storm-component->task-info cluster "mystorm")]
@@ -278,10 +296,17 @@
           topology-name "test-auto-cred-storm"
           submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE)
           - (.set_creds submitOptions (Credentials. (HashMap.)))
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) 
:parallelism-hint 3)}
-                     {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 4)
-                      "3" (thrift/mk-bolt-spec {"2" :none} 
(TestPlannerBolt.))})
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. false) (Integer. 3))}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.) (Integer. 4))
+                      "3" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "2" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.))})
           _ (submit-local-topology-with-opts nimbus topology-name 
{TOPOLOGY-WORKERS 4
                                                                
TOPOLOGY-AUTO-CREDENTIALS (list "org.apache.storm.MockAutoCred")
                                                                } topology 
submitOptions)
@@ -320,10 +345,17 @@
     (letlocals
       (bind state (:storm-cluster-state cluster))
       (bind nimbus (:nimbus cluster))
-      (bind topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) 
:parallelism-hint 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 5)
-                       "3" (thrift/mk-bolt-spec {"2" :none} 
(TestPlannerBolt.))}))
+      (bind topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. false) (Integer. 3))}
+                      {"2" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareNoneGrouping)}
+                             (TestPlannerBolt.) (Integer. 5))
+                       "3" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "2" nil)
+                              (Thrift/prepareNoneGrouping)}
+                             (TestPlannerBolt.))}))
 
       (submit-local-topology nimbus "noniso" {TOPOLOGY-WORKERS 4} topology)
       (advance-cluster-time cluster 11)
@@ -365,10 +397,20 @@
   (with-simulated-time-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE 
false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (let [state (:storm-cluster-state cluster)
           nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) 
:parallelism-hint 3 :conf {TOPOLOGY-TASKS 0})}
-                    {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 1 :conf {TOPOLOGY-TASKS 2})
-                     "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) 
:conf {TOPOLOGY-TASKS 5})})
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails
+                           (TestPlannerSpout. false) (Integer. 3)
+                           {TOPOLOGY-TASKS 0})}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareNoneGrouping)}
+                           (TestPlannerBolt.) (Integer. 1)
+                           {TOPOLOGY-TASKS 2})
+                     "3" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "2" nil)
+                            (Thrift/prepareNoneGrouping)}
+                           (TestPlannerBolt.) nil
+                           {TOPOLOGY-TASKS 5})})
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
           _ (advance-cluster-time cluster 11)
           task-info (storm-component->task-info cluster "mystorm")]
@@ -383,10 +425,19 @@
 (deftest test-executor-assignments
   (with-simulated-time-local-cluster[cluster :daemon-conf {SUPERVISOR-ENABLE 
false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (let [nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3 :conf {TOPOLOGY-TASKS 5})}
-                    {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 8 :conf {TOPOLOGY-TASKS 2})
-                     "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) 
:parallelism-hint 3)})
+          topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails
+                           (TestPlannerSpout. true) (Integer. 3)
+                           {TOPOLOGY-TASKS 5})}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareNoneGrouping)}
+                           (TestPlannerBolt.) (Integer. 8)
+                           {TOPOLOGY-TASKS 2})
+                     "3" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "2" nil)
+                            (Thrift/prepareNoneGrouping)}
+                           (TestPlannerBolt.) (Integer. 3))})
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
           _ (advance-cluster-time cluster 11)
           task-info (storm-component->task-info cluster "mystorm")
@@ -408,12 +459,21 @@
                        :daemon-conf {SUPERVISOR-ENABLE false 
TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (let [state (:storm-cluster-state cluster)
           nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 21)}
-                     {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 9)
-                      "3" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 2)
-                      "4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 10)}
-                     )
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. true) (Integer. 21))}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.) (Integer. 9))
+                      "3" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.) (Integer. 2))
+                      "4" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareNoneGrouping)}
+                            (TestPlannerBolt.) (Integer. 10))})
           _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
           _ (advance-cluster-time cluster 11)
           task-info (storm-component->task-info cluster "test")]
@@ -436,10 +496,10 @@
     (stubbing [nimbus/user-groups ["alice-group"]]
       (letlocals
         (bind conf (:daemon-conf cluster))
-        (bind topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 4)}
-                         {}
-                         ))
+        (bind topology (Thrift/buildTopology
+                         {"1" (Thrift/prepareSpoutDetails
+                                (TestPlannerSpout. true) (Integer. 4))}
+                         {}))
         (bind state (:storm-cluster-state cluster))
         (submit-local-topology (:nimbus cluster) "test" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty 
"user.name")]} topology)
         (bind storm-id (get-storm-id state "test"))
@@ -532,10 +592,10 @@
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
       (bind conf (:daemon-conf cluster))
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 14)}
-                       {}
-                       ))
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 14))}
+                       {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
       (bind storm-id (get-storm-id state "test"))
@@ -626,10 +686,10 @@
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
       (bind conf (:daemon-conf cluster))
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
-                       {}
-                       ))
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 2))}
+                       {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} 
topology)
       (advance-cluster-time cluster 11)
@@ -747,10 +807,10 @@
       (add-supervisor cluster :ports 1 :id "a")
       (add-supervisor cluster :ports 1 :id "b")
       (bind conf (:daemon-conf cluster))
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
-                       {}
-                       ))
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 2))}
+                       {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} 
topology)
       (advance-cluster-time cluster 11)
@@ -803,8 +863,9 @@
                   TOPOLOGY-ACKER-EXECUTORS 0
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 9)}
+      (bind topology (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 9))}
                         {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} 
topology)  ; distribution should be 2, 2, 2, 3 ideally
@@ -852,8 +913,9 @@
                   TOPOLOGY-ACKER-EXECUTORS 0
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+      (bind topology (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 3))}
                         {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster)
@@ -898,10 +960,10 @@
                   TOPOLOGY-ACKER-EXECUTORS 0
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
-                                :parallelism-hint 6
-                                :conf {TOPOLOGY-TASKS 12})}
+      (bind topology (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 6)
+                                {TOPOLOGY-TASKS 12})}
                         {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster)
@@ -976,14 +1038,17 @@
                   TOPOLOGY-ACKER-EXECUTORS 0
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+      (bind topology (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 3))}
                         {}))
-      (bind topology2 (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+      (bind topology2 (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 3))}
                         {}))
-      (bind topology3 (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+      (bind topology3 (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 3))}
                         {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster)
@@ -1023,18 +1088,20 @@
                   NIMBUS-EXECUTORS-PER-TOPOLOGY 8
                   NIMBUS-SLOTS-PER-TOPOLOGY 8}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                        {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 1 :conf {TOPOLOGY-TASKS 1})}
+      (bind topology (Thrift/buildTopology
+                        {"1" (Thrift/prepareSpoutDetails
+                               (TestPlannerSpout. true) (Integer. 1)
+                               {TOPOLOGY-TASKS 1})}
                         {}))
       (is (thrown? InvalidTopologyException
         (submit-local-topology (:nimbus cluster)
                                "test/aaa"
                                {}
                                topology)))
-      (bind topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
-                                                 :parallelism-hint 16
-                                                 :conf {TOPOLOGY-TASKS 16})}
+      (bind topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. true) (Integer. 16)
+                             {TOPOLOGY-TASKS 16})}
                       {}))
       (bind state (:storm-cluster-state cluster))
       (is (thrown? InvalidTopologyException
@@ -1042,10 +1109,10 @@
                                           "test"
                                           {TOPOLOGY-WORKERS 3}
                                           topology)))
-      (bind topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
-                                                 :parallelism-hint 5
-                                                 :conf {TOPOLOGY-TASKS 5})}
+      (bind topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. true) (Integer. 5)
+                             {TOPOLOGY-TASKS 5})}
                       {}))
       (is (thrown? InvalidTopologyException
                    (submit-local-topology (:nimbus cluster)
@@ -1080,8 +1147,9 @@
                             STORM-LOCAL-DIR nimbus-dir}))
          (bind cluster-state (cluster/mk-storm-cluster-state conf))
          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (bind topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+         (bind topology (Thrift/buildTopology
+                         {"1" (Thrift/prepareSpoutDetails
+                                (TestPlannerSpout. true) (Integer. 3))}
                          {}))
          (submit-local-topology nimbus "t1" {} topology)
          (submit-local-topology nimbus "t2" {} topology)
@@ -1152,8 +1220,9 @@
                         STORM-LOCAL-DIR nimbus-dir}))
           (bind cluster-state (cluster/mk-storm-cluster-state conf))
           (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
-          (bind topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+          (bind topology (Thrift/buildTopology
+                           {"1" (Thrift/prepareSpoutDetails
+                                  (TestPlannerSpout. true) (Integer. 3))}
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
@@ -1204,7 +1273,7 @@
                           
"org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
     (let [
           nimbus (:nimbus cluster)
-          topology (thrift/mk-topology {} {})
+          topology (Thrift/buildTopology {} {})
          ]
       (is (thrown? AuthorizationException
           (submit-local-topology-with-opts nimbus "mystorm" {} topology
@@ -1220,7 +1289,7 @@
                           
"org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
     (let [
           nimbus (:nimbus cluster)
-          topology (thrift/mk-topology {} {})
+          topology (Thrift/buildTopology {} {})
          ]
       ; Fake good authorization as part of setup.
       (mocking [nimbus/check-authorization!]
@@ -1247,7 +1316,7 @@
                        :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
     (let [nimbus (:nimbus cluster)
           topology-name "test-nimbus-check-autho-params"
-          topology (thrift/mk-topology {} {})]
+          topology (Thrift/buildTopology {} {})]
 
       (submit-local-topology-with-opts nimbus topology-name {} topology
           (SubmitOptions. TopologyInitialStatus/INACTIVE))
@@ -1432,7 +1501,7 @@
 (deftest test-validate-topo-config-on-submit
   (with-local-cluster [cluster]
     (let [nimbus (:nimbus cluster)
-          topology (thrift/mk-topology {} {})
+          topology (Thrift/buildTopology {} {})
           bad-config {"topology.isolate.machines" "2"}]
       ; Fake good authorization as part of setup.
       (mocking [nimbus/check-authorization!]
@@ -1453,8 +1522,9 @@
         (bind cluster-state (cluster/mk-storm-cluster-state conf))
         (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
         (Time/sleepSecs 1)
-        (bind topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+        (bind topology (Thrift/buildTopology
+                         {"1" (Thrift/prepareSpoutDetails
+                                (TestPlannerSpout. true) (Integer. 3))}
                          {}))
         (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} 
topology)
         ; make transition for topology t1 to be killed -> nimbus applies this 
event to cluster state
@@ -1486,8 +1556,9 @@
           (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
           (bind notifier (InMemoryTopologyActionNotifier.))
           (Time/sleepSecs 1)
-          (bind topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+          (bind topology (Thrift/buildTopology
+                           {"1" (Thrift/prepareSpoutDetails
+                                  (TestPlannerSpout. true) (Integer. 3))}
                            {}))
           (submit-local-topology nimbus "test-notification" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
 
@@ -1512,8 +1583,9 @@
 (deftest test-debug-on-component
   (with-local-cluster [cluster]
     (let [nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                     {"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+          topology (Thrift/buildTopology
+                     {"spout" (Thrift/prepareSpoutDetails
+                                (TestPlannerSpout. true) (Integer. 3))}
                      {})]
         (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
         (.debug nimbus "t1" "spout" true 100))))
@@ -1521,8 +1593,9 @@
 (deftest test-debug-on-global
   (with-local-cluster [cluster]
     (let [nimbus (:nimbus cluster)
-          topology (thrift/mk-topology
-                     {"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+          topology (Thrift/buildTopology
+                     {"spout" (Thrift/prepareSpoutDetails
+                                (TestPlannerSpout. true) (Integer. 3))}
                      {})]
       (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
       (.debug nimbus "t1" "" true 100))))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
 
b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
index f613a5b..4ca0721 100644
--- 
a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ 
b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns org.apache.storm.scheduler.resource-aware-scheduler-test
   (:use [clojure test])
-  (:use [org.apache.storm util config testing thrift])
+  (:use [org.apache.storm util config testing])
+  (:use [org.apache.storm.internal thrift])
   (:require [org.apache.storm.util :refer [map-val]])
   (:require [org.apache.storm.daemon [nimbus :as nimbus]])
   (:import [org.apache.storm.generated StormTopology]

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj 
b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 9c31ddf..345cb24 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -31,10 +31,12 @@
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
                                                  UtilsInstaller])
   (:import [java.nio.file.attribute FileAttribute])
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils])
   (:use [org.apache.storm config testing util timer log])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as 
supervisor]]
-            [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
+            [org.apache.storm [cluster :as cluster]])
   (:use [conjure core])
   (:require [clojure.java.io :as io]))
 
@@ -103,8 +105,9 @@
                   SUPERVISOR-WORKER-TIMEOUT-SECS 15
                   SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 4)}
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 4))}
                        {}))
       (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
       (bind changed (capture-changed-workers
@@ -156,11 +159,13 @@
                   SUPERVISOR-WORKER-TIMEOUT-SECS 15
                   SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
     (letlocals
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 4)}
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 4))}
                        {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+      (bind topology2 (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 3))}
                        {}))
       (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
       (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
@@ -270,8 +275,9 @@
       (check-heartbeat cluster "sup" 3)
       (advance-cluster-time cluster 15)
       (check-heartbeat cluster "sup" 3)
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 4)}
+      (bind topology (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 4))}
                        {}))
       ;; prevent them from launching by capturing them
       (capture-changed-workers
@@ -646,7 +652,7 @@
           (supervisor/supervisor-data auth-conf nil fake-isupervisor)
           (verify-call-times-for cluster/mk-storm-cluster-state 1)
           (verify-first-call-args-for-indices cluster/mk-storm-cluster-state 
[2]
-              expected-acls)))))
+              expected-acls))))))
 
   (deftest test-write-log-metadata
     (testing "supervisor writes correct data to logs metadata file"
@@ -769,66 +775,68 @@
             childopts-with-ids (supervisor/substitute-childopts childopts 
worker-id topology-id port mem-onheap)]
         (is (= expected-childopts childopts-with-ids)))))
 
-  (deftest test-retry-read-assignments
-    (with-simulated-time-local-cluster [cluster
-                                        :supervisors 0
-                                        :ports-per-supervisor 2
-                                        :daemon-conf 
{ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                      NIMBUS-MONITOR-FREQ-SECS 
10
-                                                      
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                      TOPOLOGY-ACKER-EXECUTORS 
0}]
-      (letlocals
-        (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-        (bind topology1 (thrift/mk-topology
-                          {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
-                          {}))
-        (bind topology2 (thrift/mk-topology
-                          {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
-                          {}))
-        (bind state (:storm-cluster-state cluster))
-        (bind changed (capture-changed-workers
-                        (submit-mocked-assignment
-                          (:nimbus cluster)
-                          (:storm-cluster-state cluster)
-                          "topology1"
-                          {TOPOLOGY-WORKERS 2}
-                          topology1
-                          {1 "1"
-                           2 "1"}
-                          {[1 1] ["sup1" 1]
-                           [2 2] ["sup1" 2]}
-                          {["sup1" 1] [0.0 0.0 0.0]
-                           ["sup1" 2] [0.0 0.0 0.0]
-                           })
-                        (submit-mocked-assignment
-                          (:nimbus cluster)
-                          (:storm-cluster-state cluster)
-                          "topology2"
-                          {TOPOLOGY-WORKERS 2}
-                          topology2
-                          {1 "1"
-                           2 "1"}
-                          {[1 1] ["sup1" 1]
-                           [2 2] ["sup1" 2]}
-                          {["sup1" 1] [0.0 0.0 0.0]
-                           ["sup1" 2] [0.0 0.0 0.0]
-                           })
-                        ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
-                        (.rebalance (:nimbus cluster) "topology1" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
-                        ))
-        (is (empty? (:launched changed)))
-        (bind options (RebalanceOptions.))
-        (.set_wait_secs options 0)
-        (bind changed (capture-changed-workers
-                        (.rebalance (:nimbus cluster) "topology2" options)
-                        (advance-cluster-time cluster 10)
-                        (heartbeat-workers cluster "sup1" [1 2 3 4])
-                        (advance-cluster-time cluster 10)
-                        ))
-        (validate-launched-once (:launched changed)
-          {"sup1" [1 2]}
-          (get-storm-id (:storm-cluster-state cluster) "topology1"))
-        (validate-launched-once (:launched changed)
-          {"sup1" [3 4]}
-          (get-storm-id (:storm-cluster-state cluster) "topology2"))
-        ))))
+(deftest test-retry-read-assignments
+  (with-simulated-time-local-cluster [cluster
+                                      :supervisors 0
+                                      :ports-per-supervisor 2
+                                      :daemon-conf 
{ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                    NIMBUS-MONITOR-FREQ-SECS 10
+                                                    
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                    TOPOLOGY-ACKER-EXECUTORS 
0}]
+    (letlocals
+     (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+     (bind topology1 (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. true) (Integer. 2))}
+                      {}))
+     (bind topology2 (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails
+                             (TestPlannerSpout. true) (Integer. 2))}
+                      {}))
+     (bind state (:storm-cluster-state cluster))
+     (bind changed (capture-changed-workers
+                    (submit-mocked-assignment
+                     (:nimbus cluster)
+                     (:storm-cluster-state cluster)
+                     "topology1"
+                     {TOPOLOGY-WORKERS 2}
+                     topology1
+                     {1 "1"
+                      2 "1"}
+                     {[1 1] ["sup1" 1]
+                      [2 2] ["sup1" 2]}
+                     {["sup1" 1] [0.0 0.0 0.0]
+                      ["sup1" 2] [0.0 0.0 0.0]
+                      })
+                    (submit-mocked-assignment
+                     (:nimbus cluster)
+                     (:storm-cluster-state cluster)
+                     "topology2"
+                     {TOPOLOGY-WORKERS 2}
+                     topology2
+                     {1 "1"
+                      2 "1"}
+                     {[1 1] ["sup1" 1]
+                      [2 2] ["sup1" 2]}
+                     {["sup1" 1] [0.0 0.0 0.0]
+                      ["sup1" 2] [0.0 0.0 0.0]
+                      })
+                    ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
+                    (.rebalance (:nimbus cluster) "topology1" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
+                    ))
+     (is (empty? (:launched changed)))
+     (bind options (RebalanceOptions.))
+     (.set_wait_secs options 0)
+     (bind changed (capture-changed-workers
+                    (.rebalance (:nimbus cluster) "topology2" options)
+                    (advance-cluster-time cluster 10)
+                    (heartbeat-workers cluster "sup1" [1 2 3 4])
+                    (advance-cluster-time cluster 10)
+                    ))
+     (validate-launched-once (:launched changed)
+                             {"sup1" [1 2]}
+                             (get-storm-id (:storm-cluster-state cluster) 
"topology1"))
+     (validate-launched-once (:launched changed)
+                             {"sup1" [3 4]}
+                             (get-storm-id (:storm-cluster-state cluster) 
"topology2"))
+     )))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj 
b/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj
index 543db09..99ad957 100644
--- a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj
+++ b/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj
@@ -15,9 +15,11 @@
 ;; limitations under the License.
 (ns org.apache.storm.tick-tuple-test
   (:use [clojure test])
-  (:use [org.apache.storm testing clojure config])
+  (:use [org.apache.storm testing config])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm [thrift :as thrift]]))
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
 
 (defbolt noop-bolt ["tuple"] {:prepare true}
   [conf context collector]
@@ -31,9 +33,12 @@
 
 (deftest test-tick-tuple-works-with-system-bolt
   (with-simulated-time-local-cluster [cluster]
-    (let [topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec noop-spout)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["tuple"]} noop-bolt)})]
+    (let [topology (Thrift/buildTopology
+                    {"1" (Thrift/prepareSpoutDetails noop-spout)}
+                    {"2" (Thrift/prepareBoltDetails
+                           {(Utils/getGlobalStreamId "1" nil)
+                            (Thrift/prepareFieldsGrouping ["tuple"])}
+                           noop-bolt)})]
       (try
         (submit-local-topology (:nimbus cluster)
                                "test"

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/transactional_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/transactional_test.clj 
b/storm-core/test/clj/org/apache/storm/transactional_test.clj
index dd46a7d..b8af518 100644
--- a/storm-core/test/clj/org/apache/storm/transactional_test.clj
+++ b/storm-core/test/clj/org/apache/storm/transactional_test.clj
@@ -36,7 +36,8 @@
   (:import [org.mockito Matchers Mockito])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [java.util HashMap Collections ArrayList])
-  (:use [org.apache.storm testing util config clojure])
+  (:use [org.apache.storm testing util config])
+  (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common]))
 
 ;; Testing TODO:

Reply via email to