STORM-166: renaming code distributor.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1e8782f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1e8782f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1e8782f Branch: refs/heads/0.11.x-branch Commit: c1e8782f7cfb3524b6c0d51fe48669fd09b87527 Parents: 58667be Author: Parth Brahmbhatt <[email protected]> Authored: Fri Jan 9 13:52:33 2015 -0500 Committer: Parth Brahmbhatt <[email protected]> Committed: Fri Jan 9 13:52:33 2015 -0500 ---------------------------------------------------------------------- .../src/clj/backtype/storm/daemon/nimbus.clj | 24 ++++++++++---------- .../clj/backtype/storm/daemon/supervisor.clj | 16 ++++++------- .../test/clj/backtype/storm/nimbus_test.clj | 2 +- .../storm/security/auth/nimbus_auth_test.clj | 1 - .../test/clj/backtype/storm/supervisor_test.clj | 2 +- 5 files changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 1f93be6..d1f5c31 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -62,7 +62,7 @@ scheduler )) -(defmulti mk-bt-tracker cluster-mode) +(defmulti mk-code-distributor cluster-mode) (defmulti sync-code cluster-mode) (defnk is-leader [nimbus :throw-exception true] @@ -100,7 +100,7 @@ )) :scheduler (mk-scheduler conf inimbus) :leader-elector (zk-leader-elector conf) - :bt-tracker (mk-bt-tracker conf) + :bt-tracker (mk-code-distributor conf) :id->sched-status (atom {}) :cred-renewers (AuthUtils/GetCredentialRenewers conf) :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf) @@ -336,15 +336,15 @@ (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) - (if (:bt-tracker nimbus) (.upload (:bt-tracker nimbus) stormroot storm-id)) + (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id)) )) (defn- wait-for-desired-code-replication [nimbus conf storm-id] (let [min-replication-count (conf NIMBUS-MIN-REPLICATION-COUNT) max-replication-wait-time (conf NIMBUS-MAX-REPLICATION-WAIT-TIME-SEC) total-wait-time (atom 0) - current-replication-count (atom (if (:bt-tracker nimbus) (.getReplicationCount (:bt-tracker nimbus) storm-id) 0))] - (if (:bt-tracker nimbus) + current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))] + (if (:code-distributor nimbus) (while (and (> min-replication-count @current-replication-count) (or (= -1 max-replication-wait-time) (< @total-wait-time max-replication-wait-time))) @@ -353,7 +353,7 @@ min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time) (swap! total-wait-time inc) - (reset! current-replication-count (.getReplicationCount (:bt-tracker nimbus) storm-id)))) + (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id)))) (if (< min-replication-count @current-replication-count) (log-message "desired replication count " min-replication-count " achieved, current-replication-count" @current-replication-count) @@ -897,7 +897,7 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (if (:bt-tracker nimbus) (.cleanup (:bt-tracker nimbus) id)) + (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id)) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) @@ -1363,19 +1363,19 @@ (.cleanup (:downloaders nimbus)) (.cleanup (:uploaders nimbus)) (.close (:leader-elector nimbus)) - (if (:bt-tracker nimbus) (.close (:bt-tracker nimbus) (:conf nimbus))) + (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus))) (log-message "Shut down master") ) DaemonCommon (waiting? [this] (timer-waiting? (:timer nimbus)))))) -(defmethod mk-bt-tracker :distributed [conf] +(defmethod mk-code-distributor :distributed [conf] (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))] (.prepare code-distributor conf) code-distributor)) -(defmethod mk-bt-tracker :local [conf] +(defmethod mk-code-distributor :local [conf] nil) (defn download-code [conf nimbus storm-id host port] @@ -1386,8 +1386,8 @@ local-meta-file-path (master-storm-metafile-path tmp-root)] (FileUtils/forceMkdir (File. tmp-root)) (Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port) - (if (:bt-tracker nimbus) - (.download (:bt-tracker nimbus) storm-id (File. local-meta-file-path))) + (if (:code-distributor nimbus) + (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path))) (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root))) (FileUtils/moveDirectory (File. tmp-root) (File. storm-root)) (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))) http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 3fcf7eb..ceb098e 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -32,7 +32,7 @@ (defmulti download-storm-code cluster-mode) (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) -(defmulti mk-bt-tracker cluster-mode) +(defmulti mk-code-distributor cluster-mode) ;; used as part of a map from port to this (defrecord LocalAssignment [storm-id executors]) @@ -302,7 +302,7 @@ )) :assignment-versions (atom {}) :sync-retry (atom 0) - :bt-tracker (mk-bt-tracker conf) + :bt-tracker (mk-code-distributor conf) }) (defn sync-processes [supervisor] @@ -342,8 +342,8 @@ ". State: " state ", Heartbeat: " (pr-str heartbeat)) (shutdown-worker supervisor id) - (if (:bt-tracker supervisor) - (.cleanup (:bt-tracker supervisor) id)) + (if (:code-distributor supervisor) + (.cleanup (:code-distributor supervisor) id)) )) (doseq [id (vals new-worker-ids)] @@ -552,8 +552,8 @@ supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)] (FileUtils/forceMkdir (File. tmproot)) (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path) - (if (:bt-tracker supervisor) - (.download (:bt-tracker supervisor) storm-id (File. supervisor-meta-file-path))) + (if (:code-distributor supervisor) + (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path))) (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot))) (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) @@ -587,7 +587,7 @@ (storm-conf TOPOLOGY-USERS)))))}] (write-log-metadata-to-yaml-file! storm-id port data conf))) -(defmethod mk-bt-tracker :distributed [conf] +(defmethod mk-code-distributor :distributed [conf] (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))] (.prepare code-distributor conf) code-distributor)) @@ -719,7 +719,7 @@ ) ))) -(defmethod mk-bt-tracker :local [conf] nil) +(defmethod mk-code-distributor :local [conf] nil) (defmethod launch-worker :local [supervisor storm-id port worker-id] http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj index 1a94049..c937aaa 100644 --- a/storm-core/test/clj/backtype/storm/nimbus_test.clj +++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj @@ -1245,7 +1245,7 @@ uptime-computer nil new-instance nil mk-timer nil - nimbus/mk-bt-tracker nil + nimbus/mk-code-distributor nil zk-leader-elector nil nimbus/mk-scheduler nil] (nimbus/nimbus-data auth-conf fake-inimbus) http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj index a776693..2787461 100644 --- a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj +++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj @@ -48,7 +48,6 @@ nimbus-server (ThriftServer. (:daemon-conf cluster-map) (Nimbus$Processor. (:nimbus cluster-map)) ThriftConnectionType/NIMBUS)] - (Thread/sleep 2000) (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop nimbus-server)))) (.start (Thread. #(.serve nimbus-server))) (wait-for-condition #(.isServing nimbus-server)) http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 6b4328a..9328769 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -481,7 +481,7 @@ cluster/mk-storm-cluster-state nil supervisor-state nil local-hostname nil - supervisor/mk-bt-tracker nil + supervisor/mk-code-distributor nil mk-timer nil] (supervisor/supervisor-data auth-conf nil fake-isupervisor) (verify-call-times-for cluster/mk-storm-cluster-state 1)
