http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj deleted file mode 100644 index 0cb2f52..0000000 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ /dev/null @@ -1,701 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.testing - (:require [backtype.storm.daemon - [nimbus :as nimbus] - [supervisor :as supervisor] - [common :as common] - [worker :as worker] - [executor :as executor]]) - (:require [backtype.storm [process-simulator :as psim]]) - (:import [org.apache.commons.io FileUtils]) - (:import [java.io File]) - (:import [java.util HashMap ArrayList]) - (:import [java.util.concurrent.atomic AtomicInteger]) - (:import [java.util.concurrent ConcurrentHashMap]) - (:import [backtype.storm.utils Time Utils RegisteredGlobalState]) - (:import [backtype.storm.tuple Fields Tuple TupleImpl]) - (:import [backtype.storm.task TopologyContext]) - (:import [backtype.storm.generated GlobalStreamId Bolt KillOptions]) - (:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple - TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker - TestWordSpout MemoryTransactionalSpout]) - (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) - (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo - ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus - KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo - ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice]) - (:import [backtype.storm.transactional TransactionalSpoutCoordinator]) - (:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) - (:import [backtype.storm.tuple Tuple]) - (:import [backtype.storm.generated StormTopology]) - (:import [backtype.storm.task TopologyContext]) - (:require [backtype.storm [zookeeper :as zk]]) - (:require [backtype.storm.messaging.loader :as msg-loader]) - (:require [backtype.storm.daemon.acker :as acker]) - (:use [backtype.storm cluster util thrift config log local-state])) - -(defn feeder-spout - [fields] - (FeederSpout. (Fields. fields))) - -(defn local-temp-path - [] - (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid))) - -(defn delete-all - [paths] - (dorun - (for [t paths] - (if (.exists (File. t)) - (try - (FileUtils/forceDelete (File. t)) - (catch Exception e - (log-message (.getMessage e)))))))) - -(defmacro with-local-tmp - [[& tmp-syms] & body] - (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)] - `(let [~@tmp-paths] - (try - ~@body - (finally - (delete-all ~(vec tmp-syms))))))) - -(defn start-simulating-time! - [] - (Time/startSimulating)) - -(defn stop-simulating-time! - [] - (Time/stopSimulating)) - - (defmacro with-simulated-time - [& body] - `(try - (start-simulating-time!) - ~@body - (finally - (stop-simulating-time!)))) - -(defn advance-time-ms! [ms] - (Time/advanceTime ms)) - -(defn advance-time-secs! [secs] - (advance-time-ms! (* (long secs) 1000))) - -(defnk add-supervisor - [cluster-map :ports 2 :conf {} :id nil] - (let [tmp-dir (local-temp-path) - port-ids (if (sequential? ports) - ports - (doall (repeatedly ports (:port-counter cluster-map)))) - supervisor-conf (merge (:daemon-conf cluster-map) - conf - {STORM-LOCAL-DIR tmp-dir - SUPERVISOR-SLOTS-PORTS port-ids}) - id-fn (if id (fn [] id) supervisor/generate-supervisor-id) - daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))] - (swap! (:supervisors cluster-map) conj daemon) - (swap! (:tmp-dirs cluster-map) conj tmp-dir) - daemon)) - -(defn mk-shared-context [conf] - (if-not (conf STORM-LOCAL-MODE-ZMQ) - (msg-loader/mk-local-context))) - -(defn start-nimbus-daemon [conf nimbus] - (let [server (ThriftServer. conf (Nimbus$Processor. nimbus) - ThriftConnectionType/NIMBUS) - nimbus-thread (Thread. (fn [] (.serve server)))] - (log-message "Starting Nimbus server...") - (.start nimbus-thread) - server)) - - -;; returns map containing cluster info -;; local dir is always overridden in maps -;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter -;; if need to customize amt of ports more, can use add-supervisor calls afterwards -(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false] - (let [zk-tmp (local-temp-path) - [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) - (zk/mk-inprocess-zookeeper zk-tmp)) - daemon-conf (merge (read-storm-config) - {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true - ZMQ-LINGER-MILLIS 0 - TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false - TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 - STORM-CLUSTER-MODE "local" - BLOBSTORE-SUPERUSER (System/getProperty "user.name")} - (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) - {STORM-ZOOKEEPER-PORT zk-port - STORM-ZOOKEEPER-SERVERS ["localhost"]}) - daemon-conf) - nimbus-tmp (local-temp-path) - port-counter (mk-counter supervisor-slot-port-min) - nimbus (nimbus/service-handler - (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) - (if inimbus inimbus (nimbus/standalone-nimbus))) - context (mk-shared-context daemon-conf) - nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil) - cluster-map {:nimbus nimbus - :port-counter port-counter - :daemon-conf daemon-conf - :supervisors (atom []) - :state (mk-distributed-cluster-state daemon-conf) - :storm-cluster-state (mk-storm-cluster-state daemon-conf) - :tmp-dirs (atom [nimbus-tmp zk-tmp]) - :zookeeper (if (not-nil? zk-handle) zk-handle) - :shared-context context - :nimbus-thrift-server nimbus-thrift-server} - supervisor-confs (if (sequential? supervisors) - supervisors - (repeat supervisors {}))] - - (doseq [sc supervisor-confs] - (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) - cluster-map)) - -(defn get-supervisor [cluster-map supervisor-id] - (let [finder-fn #(= (.get-id %) supervisor-id)] - (find-first finder-fn @(:supervisors cluster-map)))) - -(defn kill-supervisor [cluster-map supervisor-id] - (let [finder-fn #(= (.get-id %) supervisor-id) - supervisors @(:supervisors cluster-map) - sup (find-first finder-fn - supervisors)] - ;; tmp-dir will be taken care of by shutdown - (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors)) - (.shutdown sup))) - -(defn kill-local-storm-cluster [cluster-map] - (.shutdown (:nimbus cluster-map)) - (if (not-nil? (:nimbus-thrift-server cluster-map)) - (do - (log-message "shutting down thrift server") - (try - (.stop (:nimbus-thrift-server cluster-map)) - (catch Exception e (log-message "failed to stop thrift"))) - )) - (.close (:state cluster-map)) - (.disconnect (:storm-cluster-state cluster-map)) - (doseq [s @(:supervisors cluster-map)] - (.shutdown-all-workers s) - ;; race condition here? will it launch the workers again? - (supervisor/kill-supervisor s)) - (psim/kill-all-processes) - (if (not-nil? (:zookeeper cluster-map)) - (do - (log-message "Shutting down in process zookeeper") - (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map)) - (log-message "Done shutting down in process zookeeper"))) - (doseq [t @(:tmp-dirs cluster-map)] - (log-message "Deleting temporary path " t) - (try - (rmr t) - ;; on windows, the host process still holds lock on the logfile - (catch Exception e (log-message (.getMessage e)))) )) - -(def TEST-TIMEOUT-MS - (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")] - (parse-int (if timeout timeout "5000")))) - -(defmacro while-timeout [timeout-ms condition & body] - `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)] - (log-debug "Looping until " '~condition) - (while ~condition - (when (> (System/currentTimeMillis) end-time#) - (let [thread-dump# (Utils/threadDump)] - (log-message "Condition " '~condition " not met in " ~timeout-ms "ms") - (log-message thread-dump#) - (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition))))) - ~@body) - (log-debug "Condition met " '~condition))) - -(defn wait-for-condition - ([apredicate] - (wait-for-condition TEST-TIMEOUT-MS apredicate)) - ([timeout-ms apredicate] - (while-timeout timeout-ms (not (apredicate)) - (Time/sleep 100)))) - -(defn wait-until-cluster-waiting - "Wait until the cluster is idle. Should be used with time simulation." - ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS)) - ([cluster-map timeout-ms] - ;; wait until all workers, supervisors, and nimbus is waiting - (let [supervisors @(:supervisors cluster-map) - workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes)) - daemons (concat - [(:nimbus cluster-map)] - supervisors - ; because a worker may already be dead - workers)] - (while-timeout timeout-ms (not (every? (memfn waiting?) daemons)) - (Thread/sleep (rand-int 20)) - ;; (doseq [d daemons] - ;; (if-not ((memfn waiting?) d) - ;; (println d))) - )))) - -(defn advance-cluster-time - ([cluster-map secs increment-secs] - (loop [left secs] - (when (> left 0) - (let [diff (min left increment-secs)] - (advance-time-secs! diff) - (wait-until-cluster-waiting cluster-map) - (recur (- left diff)))))) - ([cluster-map secs] - (advance-cluster-time cluster-map secs 1))) - -(defmacro with-local-cluster - [[cluster-sym & args] & body] - `(let [~cluster-sym (mk-local-storm-cluster ~@args)] - (try - ~@body - (catch Throwable t# - (log-error t# "Error in cluster") - (throw t#)) - (finally - (let [keep-waiting?# (atom true) - f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))] - (kill-local-storm-cluster ~cluster-sym) - (reset! keep-waiting?# false) - @f#))))) - -(defmacro with-simulated-time-local-cluster - [& args] - `(with-simulated-time - (with-local-cluster ~@args))) - -(defmacro with-inprocess-zookeeper - [port-sym & body] - `(with-local-tmp [tmp#] - (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)] - (try - ~@body - (finally - (zk/shutdown-inprocess-zookeeper zks#)))))) - -(defn submit-local-topology - [nimbus storm-name conf topology] - (when-not (Utils/isValidConf conf) - (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) - (.submitTopology nimbus storm-name nil (to-json conf) topology)) - -(defn submit-local-topology-with-opts - [nimbus storm-name conf topology submit-opts] - (when-not (Utils/isValidConf conf) - (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) - (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts)) - -(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources] - (fn [existing-assignments] - (let [topology-id (common/get-storm-id storm-cluster-state storm-name) - existing-assignments (into {} (for [[tid assignment] existing-assignments] - {tid (:worker->resources assignment)})) - new-assignments (assoc existing-assignments topology-id worker->resources)] - new-assignments))) - -(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port] - (fn [new-scheduler-assignments existing-assignments] - (let [topology-id (common/get-storm-id storm-cluster-state storm-name) - existing-assignments (into {} (for [[tid assignment] existing-assignments] - {tid (:executor->node+port assignment)})) - new-assignments (assoc existing-assignments topology-id executor->node+port)] - new-assignments))) - -(defn mocked-compute-new-scheduler-assignments [] - (fn [nimbus existing-assignments topologies scratch-topology-id] - existing-assignments)) - -(defn submit-mocked-assignment - [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources] - (with-var-roots [common/storm-task-info (fn [& ignored] task->component) - nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments) - nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources - storm-cluster-state - storm-name - worker->resources) - nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port - storm-cluster-state - storm-name - executor->node+port)] - (submit-local-topology nimbus storm-name conf topology))) - -(defn mk-capture-launch-fn [capture-atom] - (fn [supervisor storm-id port worker-id mem-onheap] - (let [supervisor-id (:supervisor-id supervisor) - conf (:conf supervisor) - existing (get @capture-atom [supervisor-id port] [])] - (set-worker-user! conf worker-id "") - (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))))) - -(defn find-worker-id - [supervisor-conf port] - (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (ls-approved-workers supervisor-state)] - (first ((reverse-map worker->port) port)))) - -(defn find-worker-port - [supervisor-conf worker-id] - (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (ls-approved-workers supervisor-state)] - (worker->port worker-id))) - -(defn mk-capture-shutdown-fn - [capture-atom] - (let [existing-fn supervisor/shutdown-worker] - (fn [supervisor worker-id] - (let [conf (:conf supervisor) - supervisor-id (:supervisor-id supervisor) - port (find-worker-port conf worker-id) - existing (get @capture-atom [supervisor-id port] 0)] - (swap! capture-atom assoc [supervisor-id port] (inc existing)) - (existing-fn supervisor worker-id))))) - -(defmacro capture-changed-workers - [& body] - `(let [launch-captured# (atom {}) - shutdown-captured# (atom {})] - (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#) - supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)] - ~@body - {:launched @launch-captured# - :shutdown @shutdown-captured#}))) - -(defmacro capture-launched-workers - [& body] - `(:launched (capture-changed-workers ~@body))) - -(defmacro capture-shutdown-workers - [& body] - `(:shutdown (capture-changed-workers ~@body))) - -(defnk aggregated-stat - [cluster-map storm-name stat-key :component-ids nil] - (let [state (:storm-cluster-state cluster-map) - nimbus (:nimbus cluster-map) - storm-id (common/get-storm-id state storm-name) - component->tasks (reverse-map - (common/storm-task-info - (.getUserTopology nimbus storm-id) - (from-json (.getTopologyConf nimbus storm-id)))) - component->tasks (if component-ids - (select-keys component->tasks component-ids) - component->tasks) - task-ids (apply concat (vals component->tasks)) - assignment (.assignment-info state storm-id nil) - taskbeats (.taskbeats state storm-id (:task->node+port assignment)) - heartbeats (dofor [id task-ids] (get taskbeats id)) - stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))] - (reduce + stats))) - -(defn emitted-spout-tuples - [cluster-map topology storm-name] - (aggregated-stat - cluster-map - storm-name - :emitted - :component-ids (keys (.get_spouts topology)))) - -(defn transferred-tuples - [cluster-map storm-name] - (aggregated-stat cluster-map storm-name :transferred)) - -(defn acked-tuples - [cluster-map storm-name] - (aggregated-stat cluster-map storm-name :acked)) - -(defn simulate-wait - [cluster-map] - (if (Time/isSimulating) - (advance-cluster-time cluster-map 10) - (Thread/sleep 100))) - -(defprotocol CompletableSpout - (exhausted? - [this] - "Whether all the tuples for this spout have been completed.") - (cleanup - [this] - "Cleanup any global state kept") - (startup - [this] - "Prepare the spout (globally) before starting the topology")) - -(extend-type FixedTupleSpout - CompletableSpout - (exhausted? [this] - (= (-> this .getSourceTuples count) - (.getCompleted this))) - (cleanup [this] - (.cleanup this)) - (startup [this])) - -(extend-type TransactionalSpoutCoordinator - CompletableSpout - (exhausted? [this] - (exhausted? (.getSpout this))) - (cleanup [this] - (cleanup (.getSpout this))) - (startup [this] - (startup (.getSpout this)))) - -(extend-type PartitionedTransactionalSpoutExecutor - CompletableSpout - (exhausted? [this] - (exhausted? (.getPartitionedSpout this))) - (cleanup [this] - (cleanup (.getPartitionedSpout this))) - (startup [this] - (startup (.getPartitionedSpout this)))) - -(extend-type MemoryTransactionalSpout - CompletableSpout - (exhausted? [this] - (.isExhaustedTuples this)) - (cleanup [this] - (.cleanup this)) - (startup [this] - (.startup this))) - -(defn spout-objects [spec-map] - (for [[_ spout-spec] spec-map] - (-> spout-spec - .get_spout_object - deserialized-component-object))) - -(defn capture-topology - [topology] - (let [topology (.deepCopy topology) - spouts (.get_spouts topology) - bolts (.get_bolts topology) - all-streams (apply concat - (for [[id spec] (merge (clojurify-structure spouts) - (clojurify-structure bolts))] - (for [[stream info] (.. spec get_common get_streams)] - [(GlobalStreamId. id stream) (.is_direct info)]))) - capturer (TupleCaptureBolt.)] - (.set_bolts topology - (assoc (clojurify-structure bolts) - (uuid) - (Bolt. - (serialize-component-object capturer) - (mk-plain-component-common (into {} (for [[id direct?] all-streams] - [id (if direct? - (mk-direct-grouping) - (mk-global-grouping))])) - {} - nil)))) - {:topology topology - :capturer capturer})) - -;; TODO: mock-sources needs to be able to mock out state spouts as well -(defnk complete-topology - [cluster-map topology - :mock-sources {} - :storm-conf {} - :cleanup-state true - :topology-name nil - :timeout-ms TEST-TIMEOUT-MS] - ;; TODO: the idea of mocking for transactional topologies should be done an - ;; abstraction level above... should have a complete-transactional-topology for this - (let [{topology :topology capturer :capturer} (capture-topology topology) - storm-name (or topology-name (str "topologytest-" (uuid))) - state (:storm-cluster-state cluster-map) - spouts (.get_spouts topology) - replacements (map-val (fn [v] - (FixedTupleSpout. - (for [tup v] - (if (map? tup) - (FixedTuple. (:stream tup) (:values tup)) - tup)))) - mock-sources)] - (doseq [[id spout] replacements] - (let [spout-spec (get spouts id)] - (.set_spout_object spout-spec (serialize-component-object spout)))) - (doseq [spout (spout-objects spouts)] - (when-not (extends? CompletableSpout (.getClass spout)) - (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout))))) - - (doseq [spout (spout-objects spouts)] - (startup spout)) - - (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology) - (advance-cluster-time cluster-map 11) - - (let [storm-id (common/get-storm-id state storm-name)] - ;;Give the topology time to come up without using it to wait for the spouts to complete - (simulate-wait cluster-map) - - (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts))) - (simulate-wait cluster-map)) - - (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0))) - (while-timeout timeout-ms (.assignment-info state storm-id nil) - (simulate-wait cluster-map)) - (when cleanup-state - (doseq [spout (spout-objects spouts)] - (cleanup spout)))) - - (if cleanup-state - (.getAndRemoveResults capturer) - (.getAndClearResults capturer)))) - -(defn read-tuples - ([results component-id stream-id] - (let [fixed-tuples (get results component-id [])] - (mapcat - (fn [ft] - (if (= stream-id (. ft stream)) - [(vec (. ft values))])) - fixed-tuples) - )) - ([results component-id] - (read-tuples results component-id Utils/DEFAULT_STREAM_ID))) - -(defn ms= - [& args] - (apply = (map multi-set args))) - -(def TRACKER-BOLT-ID "+++tracker-bolt") - -;; TODO: should override system-topology! and wrap everything there -(defn mk-tracked-topology - ([tracked-cluster topology] - (let [track-id (::track-id tracked-cluster) - ret (.deepCopy topology)] - (dofor [[_ bolt] (.get_bolts ret) - :let [obj (deserialized-component-object (.get_bolt_object bolt))]] - (.set_bolt_object bolt (serialize-component-object - (BoltTracker. obj track-id)))) - (dofor [[_ spout] (.get_spouts ret) - :let [obj (deserialized-component-object (.get_spout_object spout))]] - (.set_spout_object spout (serialize-component-object - (SpoutTracker. obj track-id)))) - {:topology ret - :last-spout-emit (atom 0) - :cluster tracked-cluster}))) - -(defn assoc-track-id - [cluster track-id] - (assoc cluster ::track-id track-id)) - -(defn increment-global! - [id key amt] - (-> (RegisteredGlobalState/getState id) - (get key) - (.addAndGet amt))) - -(defn global-amt - [id key] - (-> (RegisteredGlobalState/getState id) - (get key) - .get)) - -(defmacro with-tracked-cluster - [[cluster-sym & cluster-args] & body] - `(let [id# (uuid)] - (RegisteredGlobalState/setState - id# - (doto (ConcurrentHashMap.) - (.put "spout-emitted" (AtomicInteger. 0)) - (.put "transferred" (AtomicInteger. 0)) - (.put "processed" (AtomicInteger. 0)))) - (with-var-roots - [acker/mk-acker-bolt - (let [old# acker/mk-acker-bolt] - (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#))) - ;; critical that this particular function is overridden here, - ;; since the transferred stat needs to be incremented at the moment - ;; of tuple emission (and not on a separate thread later) for - ;; topologies to be tracked correctly. This is because "transferred" *must* - ;; be incremented before "processing". - executor/mk-executor-transfer-fn - (let [old# executor/mk-executor-transfer-fn] - (fn [& args#] - (let [transferrer# (apply old# args#)] - (fn [& args2#] - ;; (log-message "Transferring: " transfer-args#) - (increment-global! id# "transferred" 1) - (apply transferrer# args2#)))))] - (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args] - (let [~cluster-sym (assoc-track-id ~cluster-sym id#)] - ~@body))) - (RegisteredGlobalState/clearState id#))) - -(defn tracked-wait - "Waits until topology is idle and 'amt' more tuples have been emitted by spouts." - ([tracked-topology] - (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS)) - ([tracked-topology amt] - (tracked-wait tracked-topology amt TEST-TIMEOUT-MS)) - ([tracked-topology amt timeout-ms] - (let [target (+ amt @(:last-spout-emit tracked-topology)) - track-id (-> tracked-topology :cluster ::track-id) - waiting? (fn [] - (or (not= target (global-amt track-id "spout-emitted")) - (not= (global-amt track-id "transferred") - (global-amt track-id "processed"))))] - (while-timeout timeout-ms (waiting?) - ;; (println "Spout emitted: " (global-amt track-id "spout-emitted")) - ;; (println "Processed: " (global-amt track-id "processed")) - ;; (println "Transferred: " (global-amt track-id "transferred")) - (Thread/sleep (rand-int 200))) - (reset! (:last-spout-emit tracked-topology) target)))) - -(defnk test-tuple - [values - :stream Utils/DEFAULT_STREAM_ID - :component "component" - :fields nil] - (let [fields (or fields - (->> (iterate inc 1) - (take (count values)) - (map #(str "field" %)))) - spout-spec (mk-spout-spec* (TestWordSpout.) - {stream fields}) - topology (StormTopology. {component spout-spec} {} {}) - context (TopologyContext. - topology - (read-storm-config) - {(int 1) component} - {component [(int 1)]} - {component {stream (Fields. fields)}} - "test-storm-id" - nil - nil - (int 1) - nil - [(int 1)] - {} - {} - (HashMap.) - (HashMap.) - (atom false))] - (TupleImpl. context values 1 stream))) - -(defmacro with-timeout - [millis unit & body] - `(let [f# (future ~@body)] - (try - (.get f# ~millis ~unit) - (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing4j.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj deleted file mode 100644 index bc5dc57..0000000 --- a/storm-core/src/clj/backtype/storm/testing4j.clj +++ /dev/null @@ -1,184 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.testing4j - (:import [java.util Map List Collection ArrayList]) - (:require [backtype.storm [LocalCluster :as LocalCluster]]) - (:import [backtype.storm Config ILocalCluster LocalCluster]) - (:import [backtype.storm.generated StormTopology]) - (:import [backtype.storm.daemon nimbus]) - (:import [backtype.storm.testing TestJob MockedSources TrackedTopology - MkClusterParam CompleteTopologyParam MkTupleParam]) - (:import [backtype.storm.utils Utils]) - (:use [backtype.storm testing util log]) - (:gen-class - :name backtype.storm.Testing - :methods [^:static [completeTopology - [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology - backtype.storm.testing.CompleteTopologyParam] - java.util.Map] - ^:static [completeTopology - [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] - java.util.Map] - ^:static [withSimulatedTime [Runnable] void] - ^:static [withLocalCluster [backtype.storm.testing.TestJob] void] - ^:static [withLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void] - ^:static [getLocalCluster [java.util.Map] backtype.storm.ILocalCluster] - ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.TestJob] void] - ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void] - ^:static [withTrackedCluster [backtype.storm.testing.TestJob] void] - ^:static [withTrackedCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void] - ^:static [readTuples [java.util.Map String String] java.util.List] - ^:static [readTuples [java.util.Map String] java.util.List] - ^:static [mkTrackedTopology [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] backtype.storm.testing.TrackedTopology] - ^:static [trackedWait [backtype.storm.testing.TrackedTopology] void] - ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer] void] - ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer Integer] void] - ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void] - ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void] - ^:static [multiseteq [java.util.Collection java.util.Collection] boolean] - ^:static [multiseteq [java.util.Map java.util.Map] boolean] - ^:static [testTuple [java.util.List] backtype.storm.tuple.Tuple] - ^:static [testTuple [java.util.List backtype.storm.testing.MkTupleParam] backtype.storm.tuple.Tuple]])) - -(defn -completeTopology - ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam] - (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {}) - storm-conf (or (.getStormConf completeTopologyParam) {}) - cleanup-state (or (.getCleanupState completeTopologyParam) true) - topology-name (.getTopologyName completeTopologyParam) - timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)] - (complete-topology (.getState cluster) topology - :mock-sources mocked-sources - :storm-conf storm-conf - :cleanup-state cleanup-state - :topology-name topology-name - :timeout-ms timeout-ms))) - ([^ILocalCluster cluster ^StormTopology topology] - (-completeTopology cluster topology (CompleteTopologyParam.)))) - - -(defn -withSimulatedTime - [^Runnable code] - (with-simulated-time - (.run code))) - -(defmacro with-cluster - [cluster-type mkClusterParam code] - `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2) - ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3) - daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})] - (~cluster-type [cluster# :supervisors supervisors# - :ports-per-supervisor ports-per-supervisor# - :daemon-conf daemon-conf#] - (let [cluster# (LocalCluster. cluster#)] - (.run ~code cluster#))))) - -(defn -withLocalCluster - ([^MkClusterParam mkClusterParam ^TestJob code] - (with-cluster with-local-cluster mkClusterParam code)) - ([^TestJob code] - (-withLocalCluster (MkClusterParam.) code))) - -(defn -getLocalCluster - ([^Map clusterConf] - (let [daemon-conf (get-in clusterConf ["daemon-conf"] {}) - supervisors (get-in clusterConf ["supervisors"] 2) - ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3) - inimbus (get-in clusterConf ["inimbus"] nil) - supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024) - nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false) - local-cluster-map (mk-local-storm-cluster :supervisors supervisors - :ports-per-supervisor ports-per-supervisor - :daemon-conf daemon-conf - :inimbus inimbus - :supervisor-slot-port-min supervisor-slot-port-min - :nimbus-daemon nimbus-daemon - )] - (LocalCluster. local-cluster-map)))) - -(defn -withSimulatedTimeLocalCluster - ([^MkClusterParam mkClusterParam ^TestJob code] - (with-cluster with-simulated-time-local-cluster mkClusterParam code)) - ([^TestJob code] - (-withSimulatedTimeLocalCluster (MkClusterParam.) code))) - -(defn -withTrackedCluster - ([^MkClusterParam mkClusterParam ^TestJob code] - (with-cluster with-tracked-cluster mkClusterParam code)) - ([^TestJob code] - (-withTrackedCluster (MkClusterParam.) code))) - -(defn- find-tuples - [^List fixed-tuples ^String stream] - (let [ret (ArrayList.)] - (doseq [fixed-tuple fixed-tuples] - (if (= (.stream fixed-tuple) stream) - (.add ret (.values fixed-tuple)))) - ret)) - -(defn -readTuples - ([^Map result ^String componentId ^String streamId] - (let [stream-result (.get result componentId) - ret (if stream-result - (find-tuples stream-result streamId) - [])] - ret)) - ([^Map result ^String componentId] - (-readTuples result componentId Utils/DEFAULT_STREAM_ID))) - -(defn -mkTrackedTopology - [^ILocalCluster trackedCluster ^StormTopology topology] - (-> (mk-tracked-topology (.getState trackedCluster) topology) - (TrackedTopology.))) - -(defn -trackedWait - ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms] - (tracked-wait trackedTopology amt timeout-ms)) - ([^TrackedTopology trackedTopology ^Integer amt] - (tracked-wait trackedTopology amt)) - ([^TrackedTopology trackedTopology] - (-trackedWait trackedTopology 1))) - -(defn -advanceClusterTime - ([^ILocalCluster cluster ^Integer secs ^Integer step] - (advance-cluster-time (.getState cluster) secs step)) - ([^ILocalCluster cluster ^Integer secs] - (-advanceClusterTime cluster secs 1))) - -(defn- multiseteq - [^Object obj1 ^Object obj2] - (let [obj1 (clojurify-structure obj1) - obj2 (clojurify-structure obj2)] - (ms= obj1 obj2))) - -(defn -multiseteq - [^Collection coll1 ^Collection coll2] - (multiseteq coll1 coll2)) - -(defn -multiseteq - [^Map coll1 ^Map coll2] - (multiseteq coll1 coll2)) - -(defn -testTuple - ([^List values] - (-testTuple values nil)) - ([^List values ^MkTupleParam param] - (if (nil? param) - (test-tuple values) - (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID) - component (or (.getComponent param) "component") - fields (.getFields param)] - (test-tuple values :stream stream :component component :fields fields))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj deleted file mode 100644 index 8f4c659..0000000 --- a/storm-core/src/clj/backtype/storm/thrift.clj +++ /dev/null @@ -1,284 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.thrift - (:import [java.util HashMap] - [java.io Serializable] - [backtype.storm.generated NodeInfo Assignment]) - (:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology - StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface - ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo - GlobalStreamId ComponentObject ComponentObject$_Fields - ShellComponent SupervisorInfo]) - (:import [backtype.storm.utils Utils NimbusClient]) - (:import [backtype.storm Constants]) - (:import [backtype.storm.security.auth ReqContext]) - (:import [backtype.storm.grouping CustomStreamGrouping]) - (:import [backtype.storm.topology TopologyBuilder]) - (:import [backtype.storm.clojure RichShellBolt RichShellSpout]) - (:import [org.apache.thrift.transport TTransport]) - (:use [backtype.storm util config log zookeeper])) - -(defn instantiate-java-object - [^JavaObject obj] - (let [name (symbol (.get_full_class_name obj)) - args (map (memfn getFieldValue) (.get_args_list obj))] - (eval `(new ~name ~@args)))) - -(def grouping-constants - {Grouping$_Fields/FIELDS :fields - Grouping$_Fields/SHUFFLE :shuffle - Grouping$_Fields/ALL :all - Grouping$_Fields/NONE :none - Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized - Grouping$_Fields/CUSTOM_OBJECT :custom-object - Grouping$_Fields/DIRECT :direct - Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) - -(defn grouping-type - [^Grouping grouping] - (grouping-constants (.getSetField grouping))) - -(defn field-grouping - [^Grouping grouping] - (when-not (= (grouping-type grouping) :fields) - (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping"))) - (.get_fields grouping)) - -(defn global-grouping? - [^Grouping grouping] - (and (= :fields (grouping-type grouping)) - (empty? (field-grouping grouping)))) - -(defn parallelism-hint - [^ComponentCommon component-common] - (let [phint (.get_parallelism_hint component-common)] - (if-not (.is_set_parallelism_hint component-common) 1 phint))) - -(defn nimbus-client-and-conn - ([host port] - (nimbus-client-and-conn host port nil)) - ([host port as-user] - (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) - (let [conf (read-storm-config) - nimbusClient (NimbusClient. conf host port nil as-user) - client (.getClient nimbusClient) - transport (.transport nimbusClient)] - [client transport] ))) - -(defmacro with-nimbus-connection - [[client-sym host port] & body] - `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] - (try - ~@body - (finally (.close conn#))))) - -(defmacro with-configured-nimbus-connection - [client-sym & body] - `(let [conf# (read-storm-config) - context# (ReqContext/context) - user# (if (.principal context#) (.getName (.principal context#))) - nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) - ~client-sym (.getClient nimbusClient#) - conn# (.transport nimbusClient#) - ] - (try - ~@body - (finally (.close conn#))))) - -(defn direct-output-fields - [fields] - (StreamInfo. fields true)) - -(defn output-fields - [fields] - (StreamInfo. fields false)) - -(defn mk-output-spec - [output-spec] - (let [output-spec (if (map? output-spec) - output-spec - {Utils/DEFAULT_STREAM_ID output-spec})] - (map-val - (fn [out] - (if (instance? StreamInfo out) - out - (StreamInfo. out false))) - output-spec))) - -(defnk mk-plain-component-common - [inputs output-spec parallelism-hint :conf nil] - (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))] - (when parallelism-hint - (.set_parallelism_hint ret parallelism-hint)) - (when conf - (.set_json_conf ret (to-json conf))) - ret)) - -(defnk mk-spout-spec* - [spout outputs :p nil :conf nil] - (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout)) - (mk-plain-component-common {} outputs p :conf conf))) - -(defn mk-shuffle-grouping - [] - (Grouping/shuffle (NullStruct.))) - -(defn mk-local-or-shuffle-grouping - [] - (Grouping/local_or_shuffle (NullStruct.))) - -(defn mk-fields-grouping - [fields] - (Grouping/fields fields)) - -(defn mk-global-grouping - [] - (mk-fields-grouping [])) - -(defn mk-direct-grouping - [] - (Grouping/direct (NullStruct.))) - -(defn mk-all-grouping - [] - (Grouping/all (NullStruct.))) - -(defn mk-none-grouping - [] - (Grouping/none (NullStruct.))) - -(defn deserialized-component-object - [^ComponentObject obj] - (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA) - (throw (RuntimeException. "Cannot deserialize non-java-serialized object"))) - (Utils/javaDeserialize (.get_serialized_java obj) Serializable)) - -(defn serialize-component-object - [obj] - (ComponentObject/serialized_java (Utils/javaSerialize obj))) - -(defn- mk-grouping - [grouping-spec] - (cond (nil? grouping-spec) - (mk-none-grouping) - - (instance? Grouping grouping-spec) - grouping-spec - - (instance? CustomStreamGrouping grouping-spec) - (Grouping/custom_serialized (Utils/javaSerialize grouping-spec)) - - (instance? JavaObject grouping-spec) - (Grouping/custom_object grouping-spec) - - (sequential? grouping-spec) - (mk-fields-grouping grouping-spec) - - (= grouping-spec :shuffle) - (mk-shuffle-grouping) - - (= grouping-spec :local-or-shuffle) - (mk-local-or-shuffle-grouping) - (= grouping-spec :none) - (mk-none-grouping) - - (= grouping-spec :all) - (mk-all-grouping) - - (= grouping-spec :global) - (mk-global-grouping) - - (= grouping-spec :direct) - (mk-direct-grouping) - - true - (throw (IllegalArgumentException. - (str grouping-spec " is not a valid grouping"))))) - -(defn- mk-inputs - [inputs] - (into {} (for [[stream-id grouping-spec] inputs] - [(if (sequential? stream-id) - (GlobalStreamId. (first stream-id) (second stream-id)) - (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID)) - (mk-grouping grouping-spec)]))) - -(defnk mk-bolt-spec* - [inputs bolt outputs :p nil :conf nil] - (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)] - (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt)) - common))) - -(defnk mk-spout-spec - [spout :parallelism-hint nil :p nil :conf nil] - (let [parallelism-hint (if p p parallelism-hint)] - {:obj spout :p parallelism-hint :conf conf})) - -(defn- shell-component-params - [command script-or-output-spec kwargs] - (if (string? script-or-output-spec) - [(into-array String [command script-or-output-spec]) - (first kwargs) - (rest kwargs)] - [(into-array String command) - script-or-output-spec - kwargs])) - -(defnk mk-bolt-spec - [inputs bolt :parallelism-hint nil :p nil :conf nil] - (let [parallelism-hint (if p p parallelism-hint)] - {:obj bolt :inputs inputs :p parallelism-hint :conf conf})) - -(defn mk-shell-bolt-spec - [inputs command script-or-output-spec & kwargs] - (let [[command output-spec kwargs] - (shell-component-params command script-or-output-spec kwargs)] - (apply mk-bolt-spec inputs - (RichShellBolt. command (mk-output-spec output-spec)) kwargs))) - -(defn mk-shell-spout-spec - [command script-or-output-spec & kwargs] - (let [[command output-spec kwargs] - (shell-component-params command script-or-output-spec kwargs)] - (apply mk-spout-spec - (RichShellSpout. command (mk-output-spec output-spec)) kwargs))) - -(defn- add-inputs - [declarer inputs] - (doseq [[id grouping] (mk-inputs inputs)] - (.grouping declarer id grouping))) - -(defn mk-topology - ([spout-map bolt-map] - (let [builder (TopologyBuilder.)] - (doseq [[name {spout :obj p :p conf :conf}] spout-map] - (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf))) - (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map] - (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs))) - (.createTopology builder))) - ([spout-map bolt-map state-spout-map] - (mk-topology spout-map bolt-map))) - -;; clojurify-structure is needed or else every element becomes the same after successive calls -;; don't know why this happens -(def STORM-TOPOLOGY-FIELDS - (-> StormTopology/metaDataMap clojurify-structure keys)) - -(def SPOUT-FIELDS - [StormTopology$_Fields/SPOUTS - StormTopology$_Fields/STATE_SPOUTS]) - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/timer.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/timer.clj b/storm-core/src/clj/backtype/storm/timer.clj deleted file mode 100644 index b5f73f7..0000000 --- a/storm-core/src/clj/backtype/storm/timer.clj +++ /dev/null @@ -1,128 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.timer - (:import [backtype.storm.utils Time]) - (:import [java.util PriorityQueue Comparator Random]) - (:import [java.util.concurrent Semaphore]) - (:use [backtype.storm util log])) - -;; The timer defined in this file is very similar to java.util.Timer, except -;; it integrates with Storm's time simulation capabilities. This lets us test -;; code that does asynchronous work on the timer thread - -(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil] - (let [queue (PriorityQueue. 10 (reify Comparator - (compare - [this o1 o2] - (- (first o1) (first o2))) - (equals - [this obj] - true))) - active (atom true) - lock (Object.) - notifier (Semaphore. 0) - thread-name (if timer-name timer-name "timer") - timer-thread (Thread. - (fn [] - (while @active - (try - (let [[time-millis _ _ :as elem] (locking lock (.peek queue))] - (if (and elem (>= (current-time-millis) time-millis)) - ;; It is imperative to not run the function - ;; inside the timer lock. Otherwise, it is - ;; possible to deadlock if the fn deals with - ;; other locks, like the submit lock. - (let [afn (locking lock (second (.poll queue)))] - (afn)) - (if time-millis - ;; If any events are scheduled, sleep until - ;; event generation. If any recurring events - ;; are scheduled then we will always go - ;; through this branch, sleeping only the - ;; exact necessary amount of time. We give - ;; an upper bound, e.g. 1000 millis, to the - ;; sleeping time, to limit the response time - ;; for detecting any new event within 1 secs. - (Time/sleep (min 1000 (- time-millis (current-time-millis)))) - ;; Otherwise poll to see if any new event - ;; was scheduled. This is, in essence, the - ;; response time for detecting any new event - ;; schedulings when there are no scheduled - ;; events. - (Time/sleep 1000)))) - (catch Throwable t - ;; Because the interrupted exception can be - ;; wrapped in a RuntimeException. - (when-not (exception-cause? InterruptedException t) - (kill-fn t) - (reset! active false) - (throw t))))) - (.release notifier)) thread-name)] - (.setDaemon timer-thread true) - (.setPriority timer-thread Thread/MAX_PRIORITY) - (.start timer-thread) - {:timer-thread timer-thread - :queue queue - :active active - :lock lock - :random (Random.) - :cancel-notifier notifier})) - -(defn- check-active! - [timer] - (when-not @(:active timer) - (throw (IllegalStateException. "Timer is not active")))) - -(defnk schedule - [timer delay-secs afn :check-active true :jitter-ms 0] - (when check-active (check-active! timer)) - (let [id (uuid) - ^PriorityQueue queue (:queue timer) - end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs)) - end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)] - (locking (:lock timer) - (.add queue [end-time-ms afn id])))) - -(defn schedule-recurring - [timer delay-secs recur-secs afn] - (schedule timer - delay-secs - (fn this [] - (afn) - ; This avoids a race condition with cancel-timer. - (schedule timer recur-secs this :check-active false)))) - -(defn schedule-recurring-with-jitter - [timer delay-secs recur-secs jitter-ms afn] - (schedule timer - delay-secs - (fn this [] - (afn) - ; This avoids a race condition with cancel-timer. - (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms)))) - -(defn cancel-timer - [timer] - (check-active! timer) - (locking (:lock timer) - (reset! (:active timer) false) - (.interrupt (:timer-thread timer))) - (.acquire (:cancel-notifier timer))) - -(defn timer-waiting? - [timer] - (Time/isThreadWaiting (:timer-thread timer)))
