http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj new file mode 100644 index 0000000..87ca2de --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -0,0 +1,701 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.testing + (:require [org.apache.storm.daemon + [nimbus :as nimbus] + [supervisor :as supervisor] + [common :as common] + [worker :as worker] + [executor :as executor]]) + (:require [org.apache.storm [process-simulator :as psim]]) + (:import [org.apache.commons.io FileUtils]) + (:import [java.io File]) + (:import [java.util HashMap ArrayList]) + (:import [java.util.concurrent.atomic AtomicInteger]) + (:import [java.util.concurrent ConcurrentHashMap]) + (:import [org.apache.storm.utils Time Utils RegisteredGlobalState]) + (:import [org.apache.storm.tuple Fields Tuple TupleImpl]) + (:import [org.apache.storm.task TopologyContext]) + (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions]) + (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple + TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker + TestWordSpout MemoryTransactionalSpout]) + (:import [org.apache.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) + (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo + ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus + KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo + ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice]) + (:import [org.apache.storm.transactional TransactionalSpoutCoordinator]) + (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) + (:import [org.apache.storm.tuple Tuple]) + (:import [org.apache.storm.generated StormTopology]) + (:import [org.apache.storm.task TopologyContext]) + (:require [org.apache.storm [zookeeper :as zk]]) + (:require [org.apache.storm.messaging.loader :as msg-loader]) + (:require [org.apache.storm.daemon.acker :as acker]) + (:use [org.apache.storm cluster util thrift config log local-state])) + +(defn feeder-spout + [fields] + (FeederSpout. (Fields. fields))) + +(defn local-temp-path + [] + (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid))) + +(defn delete-all + [paths] + (dorun + (for [t paths] + (if (.exists (File. t)) + (try + (FileUtils/forceDelete (File. t)) + (catch Exception e + (log-message (.getMessage e)))))))) + +(defmacro with-local-tmp + [[& tmp-syms] & body] + (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)] + `(let [~@tmp-paths] + (try + ~@body + (finally + (delete-all ~(vec tmp-syms))))))) + +(defn start-simulating-time! + [] + (Time/startSimulating)) + +(defn stop-simulating-time! + [] + (Time/stopSimulating)) + + (defmacro with-simulated-time + [& body] + `(try + (start-simulating-time!) + ~@body + (finally + (stop-simulating-time!)))) + +(defn advance-time-ms! [ms] + (Time/advanceTime ms)) + +(defn advance-time-secs! [secs] + (advance-time-ms! (* (long secs) 1000))) + +(defnk add-supervisor + [cluster-map :ports 2 :conf {} :id nil] + (let [tmp-dir (local-temp-path) + port-ids (if (sequential? ports) + ports + (doall (repeatedly ports (:port-counter cluster-map)))) + supervisor-conf (merge (:daemon-conf cluster-map) + conf + {STORM-LOCAL-DIR tmp-dir + SUPERVISOR-SLOTS-PORTS port-ids}) + id-fn (if id (fn [] id) supervisor/generate-supervisor-id) + daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))] + (swap! (:supervisors cluster-map) conj daemon) + (swap! (:tmp-dirs cluster-map) conj tmp-dir) + daemon)) + +(defn mk-shared-context [conf] + (if-not (conf STORM-LOCAL-MODE-ZMQ) + (msg-loader/mk-local-context))) + +(defn start-nimbus-daemon [conf nimbus] + (let [server (ThriftServer. conf (Nimbus$Processor. nimbus) + ThriftConnectionType/NIMBUS) + nimbus-thread (Thread. (fn [] (.serve server)))] + (log-message "Starting Nimbus server...") + (.start nimbus-thread) + server)) + + +;; returns map containing cluster info +;; local dir is always overridden in maps +;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter +;; if need to customize amt of ports more, can use add-supervisor calls afterwards +(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false] + (let [zk-tmp (local-temp-path) + [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) + (zk/mk-inprocess-zookeeper zk-tmp)) + daemon-conf (merge (read-storm-config) + {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true + ZMQ-LINGER-MILLIS 0 + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false + TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 + STORM-CLUSTER-MODE "local" + BLOBSTORE-SUPERUSER (System/getProperty "user.name")} + (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) + {STORM-ZOOKEEPER-PORT zk-port + STORM-ZOOKEEPER-SERVERS ["localhost"]}) + daemon-conf) + nimbus-tmp (local-temp-path) + port-counter (mk-counter supervisor-slot-port-min) + nimbus (nimbus/service-handler + (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) + (if inimbus inimbus (nimbus/standalone-nimbus))) + context (mk-shared-context daemon-conf) + nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil) + cluster-map {:nimbus nimbus + :port-counter port-counter + :daemon-conf daemon-conf + :supervisors (atom []) + :state (mk-distributed-cluster-state daemon-conf) + :storm-cluster-state (mk-storm-cluster-state daemon-conf) + :tmp-dirs (atom [nimbus-tmp zk-tmp]) + :zookeeper (if (not-nil? zk-handle) zk-handle) + :shared-context context + :nimbus-thrift-server nimbus-thrift-server} + supervisor-confs (if (sequential? supervisors) + supervisors + (repeat supervisors {}))] + + (doseq [sc supervisor-confs] + (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) + cluster-map)) + +(defn get-supervisor [cluster-map supervisor-id] + (let [finder-fn #(= (.get-id %) supervisor-id)] + (find-first finder-fn @(:supervisors cluster-map)))) + +(defn kill-supervisor [cluster-map supervisor-id] + (let [finder-fn #(= (.get-id %) supervisor-id) + supervisors @(:supervisors cluster-map) + sup (find-first finder-fn + supervisors)] + ;; tmp-dir will be taken care of by shutdown + (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors)) + (.shutdown sup))) + +(defn kill-local-storm-cluster [cluster-map] + (.shutdown (:nimbus cluster-map)) + (if (not-nil? (:nimbus-thrift-server cluster-map)) + (do + (log-message "shutting down thrift server") + (try + (.stop (:nimbus-thrift-server cluster-map)) + (catch Exception e (log-message "failed to stop thrift"))) + )) + (.close (:state cluster-map)) + (.disconnect (:storm-cluster-state cluster-map)) + (doseq [s @(:supervisors cluster-map)] + (.shutdown-all-workers s) + ;; race condition here? will it launch the workers again? + (supervisor/kill-supervisor s)) + (psim/kill-all-processes) + (if (not-nil? (:zookeeper cluster-map)) + (do + (log-message "Shutting down in process zookeeper") + (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map)) + (log-message "Done shutting down in process zookeeper"))) + (doseq [t @(:tmp-dirs cluster-map)] + (log-message "Deleting temporary path " t) + (try + (rmr t) + ;; on windows, the host process still holds lock on the logfile + (catch Exception e (log-message (.getMessage e)))) )) + +(def TEST-TIMEOUT-MS + (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")] + (parse-int (if timeout timeout "5000")))) + +(defmacro while-timeout [timeout-ms condition & body] + `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)] + (log-debug "Looping until " '~condition) + (while ~condition + (when (> (System/currentTimeMillis) end-time#) + (let [thread-dump# (Utils/threadDump)] + (log-message "Condition " '~condition " not met in " ~timeout-ms "ms") + (log-message thread-dump#) + (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition))))) + ~@body) + (log-debug "Condition met " '~condition))) + +(defn wait-for-condition + ([apredicate] + (wait-for-condition TEST-TIMEOUT-MS apredicate)) + ([timeout-ms apredicate] + (while-timeout timeout-ms (not (apredicate)) + (Time/sleep 100)))) + +(defn wait-until-cluster-waiting + "Wait until the cluster is idle. Should be used with time simulation." + ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS)) + ([cluster-map timeout-ms] + ;; wait until all workers, supervisors, and nimbus is waiting + (let [supervisors @(:supervisors cluster-map) + workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes)) + daemons (concat + [(:nimbus cluster-map)] + supervisors + ; because a worker may already be dead + workers)] + (while-timeout timeout-ms (not (every? (memfn waiting?) daemons)) + (Thread/sleep (rand-int 20)) + ;; (doseq [d daemons] + ;; (if-not ((memfn waiting?) d) + ;; (println d))) + )))) + +(defn advance-cluster-time + ([cluster-map secs increment-secs] + (loop [left secs] + (when (> left 0) + (let [diff (min left increment-secs)] + (advance-time-secs! diff) + (wait-until-cluster-waiting cluster-map) + (recur (- left diff)))))) + ([cluster-map secs] + (advance-cluster-time cluster-map secs 1))) + +(defmacro with-local-cluster + [[cluster-sym & args] & body] + `(let [~cluster-sym (mk-local-storm-cluster ~@args)] + (try + ~@body + (catch Throwable t# + (log-error t# "Error in cluster") + (throw t#)) + (finally + (let [keep-waiting?# (atom true) + f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))] + (kill-local-storm-cluster ~cluster-sym) + (reset! keep-waiting?# false) + @f#))))) + +(defmacro with-simulated-time-local-cluster + [& args] + `(with-simulated-time + (with-local-cluster ~@args))) + +(defmacro with-inprocess-zookeeper + [port-sym & body] + `(with-local-tmp [tmp#] + (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)] + (try + ~@body + (finally + (zk/shutdown-inprocess-zookeeper zks#)))))) + +(defn submit-local-topology + [nimbus storm-name conf topology] + (when-not (Utils/isValidConf conf) + (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) + (.submitTopology nimbus storm-name nil (to-json conf) topology)) + +(defn submit-local-topology-with-opts + [nimbus storm-name conf topology submit-opts] + (when-not (Utils/isValidConf conf) + (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) + (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts)) + +(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources] + (fn [existing-assignments] + (let [topology-id (common/get-storm-id storm-cluster-state storm-name) + existing-assignments (into {} (for [[tid assignment] existing-assignments] + {tid (:worker->resources assignment)})) + new-assignments (assoc existing-assignments topology-id worker->resources)] + new-assignments))) + +(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port] + (fn [new-scheduler-assignments existing-assignments] + (let [topology-id (common/get-storm-id storm-cluster-state storm-name) + existing-assignments (into {} (for [[tid assignment] existing-assignments] + {tid (:executor->node+port assignment)})) + new-assignments (assoc existing-assignments topology-id executor->node+port)] + new-assignments))) + +(defn mocked-compute-new-scheduler-assignments [] + (fn [nimbus existing-assignments topologies scratch-topology-id] + existing-assignments)) + +(defn submit-mocked-assignment + [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources] + (with-var-roots [common/storm-task-info (fn [& ignored] task->component) + nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments) + nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources + storm-cluster-state + storm-name + worker->resources) + nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port + storm-cluster-state + storm-name + executor->node+port)] + (submit-local-topology nimbus storm-name conf topology))) + +(defn mk-capture-launch-fn [capture-atom] + (fn [supervisor storm-id port worker-id mem-onheap] + (let [supervisor-id (:supervisor-id supervisor) + conf (:conf supervisor) + existing (get @capture-atom [supervisor-id port] [])] + (set-worker-user! conf worker-id "") + (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))))) + +(defn find-worker-id + [supervisor-conf port] + (let [supervisor-state (supervisor-state supervisor-conf) + worker->port (ls-approved-workers supervisor-state)] + (first ((reverse-map worker->port) port)))) + +(defn find-worker-port + [supervisor-conf worker-id] + (let [supervisor-state (supervisor-state supervisor-conf) + worker->port (ls-approved-workers supervisor-state)] + (worker->port worker-id))) + +(defn mk-capture-shutdown-fn + [capture-atom] + (let [existing-fn supervisor/shutdown-worker] + (fn [supervisor worker-id] + (let [conf (:conf supervisor) + supervisor-id (:supervisor-id supervisor) + port (find-worker-port conf worker-id) + existing (get @capture-atom [supervisor-id port] 0)] + (swap! capture-atom assoc [supervisor-id port] (inc existing)) + (existing-fn supervisor worker-id))))) + +(defmacro capture-changed-workers + [& body] + `(let [launch-captured# (atom {}) + shutdown-captured# (atom {})] + (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#) + supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)] + ~@body + {:launched @launch-captured# + :shutdown @shutdown-captured#}))) + +(defmacro capture-launched-workers + [& body] + `(:launched (capture-changed-workers ~@body))) + +(defmacro capture-shutdown-workers + [& body] + `(:shutdown (capture-changed-workers ~@body))) + +(defnk aggregated-stat + [cluster-map storm-name stat-key :component-ids nil] + (let [state (:storm-cluster-state cluster-map) + nimbus (:nimbus cluster-map) + storm-id (common/get-storm-id state storm-name) + component->tasks (reverse-map + (common/storm-task-info + (.getUserTopology nimbus storm-id) + (from-json (.getTopologyConf nimbus storm-id)))) + component->tasks (if component-ids + (select-keys component->tasks component-ids) + component->tasks) + task-ids (apply concat (vals component->tasks)) + assignment (.assignment-info state storm-id nil) + taskbeats (.taskbeats state storm-id (:task->node+port assignment)) + heartbeats (dofor [id task-ids] (get taskbeats id)) + stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))] + (reduce + stats))) + +(defn emitted-spout-tuples + [cluster-map topology storm-name] + (aggregated-stat + cluster-map + storm-name + :emitted + :component-ids (keys (.get_spouts topology)))) + +(defn transferred-tuples + [cluster-map storm-name] + (aggregated-stat cluster-map storm-name :transferred)) + +(defn acked-tuples + [cluster-map storm-name] + (aggregated-stat cluster-map storm-name :acked)) + +(defn simulate-wait + [cluster-map] + (if (Time/isSimulating) + (advance-cluster-time cluster-map 10) + (Thread/sleep 100))) + +(defprotocol CompletableSpout + (exhausted? + [this] + "Whether all the tuples for this spout have been completed.") + (cleanup + [this] + "Cleanup any global state kept") + (startup + [this] + "Prepare the spout (globally) before starting the topology")) + +(extend-type FixedTupleSpout + CompletableSpout + (exhausted? [this] + (= (-> this .getSourceTuples count) + (.getCompleted this))) + (cleanup [this] + (.cleanup this)) + (startup [this])) + +(extend-type TransactionalSpoutCoordinator + CompletableSpout + (exhausted? [this] + (exhausted? (.getSpout this))) + (cleanup [this] + (cleanup (.getSpout this))) + (startup [this] + (startup (.getSpout this)))) + +(extend-type PartitionedTransactionalSpoutExecutor + CompletableSpout + (exhausted? [this] + (exhausted? (.getPartitionedSpout this))) + (cleanup [this] + (cleanup (.getPartitionedSpout this))) + (startup [this] + (startup (.getPartitionedSpout this)))) + +(extend-type MemoryTransactionalSpout + CompletableSpout + (exhausted? [this] + (.isExhaustedTuples this)) + (cleanup [this] + (.cleanup this)) + (startup [this] + (.startup this))) + +(defn spout-objects [spec-map] + (for [[_ spout-spec] spec-map] + (-> spout-spec + .get_spout_object + deserialized-component-object))) + +(defn capture-topology + [topology] + (let [topology (.deepCopy topology) + spouts (.get_spouts topology) + bolts (.get_bolts topology) + all-streams (apply concat + (for [[id spec] (merge (clojurify-structure spouts) + (clojurify-structure bolts))] + (for [[stream info] (.. spec get_common get_streams)] + [(GlobalStreamId. id stream) (.is_direct info)]))) + capturer (TupleCaptureBolt.)] + (.set_bolts topology + (assoc (clojurify-structure bolts) + (uuid) + (Bolt. + (serialize-component-object capturer) + (mk-plain-component-common (into {} (for [[id direct?] all-streams] + [id (if direct? + (mk-direct-grouping) + (mk-global-grouping))])) + {} + nil)))) + {:topology topology + :capturer capturer})) + +;; TODO: mock-sources needs to be able to mock out state spouts as well +(defnk complete-topology + [cluster-map topology + :mock-sources {} + :storm-conf {} + :cleanup-state true + :topology-name nil + :timeout-ms TEST-TIMEOUT-MS] + ;; TODO: the idea of mocking for transactional topologies should be done an + ;; abstraction level above... should have a complete-transactional-topology for this + (let [{topology :topology capturer :capturer} (capture-topology topology) + storm-name (or topology-name (str "topologytest-" (uuid))) + state (:storm-cluster-state cluster-map) + spouts (.get_spouts topology) + replacements (map-val (fn [v] + (FixedTupleSpout. + (for [tup v] + (if (map? tup) + (FixedTuple. (:stream tup) (:values tup)) + tup)))) + mock-sources)] + (doseq [[id spout] replacements] + (let [spout-spec (get spouts id)] + (.set_spout_object spout-spec (serialize-component-object spout)))) + (doseq [spout (spout-objects spouts)] + (when-not (extends? CompletableSpout (.getClass spout)) + (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout))))) + + (doseq [spout (spout-objects spouts)] + (startup spout)) + + (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology) + (advance-cluster-time cluster-map 11) + + (let [storm-id (common/get-storm-id state storm-name)] + ;;Give the topology time to come up without using it to wait for the spouts to complete + (simulate-wait cluster-map) + + (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts))) + (simulate-wait cluster-map)) + + (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0))) + (while-timeout timeout-ms (.assignment-info state storm-id nil) + (simulate-wait cluster-map)) + (when cleanup-state + (doseq [spout (spout-objects spouts)] + (cleanup spout)))) + + (if cleanup-state + (.getAndRemoveResults capturer) + (.getAndClearResults capturer)))) + +(defn read-tuples + ([results component-id stream-id] + (let [fixed-tuples (get results component-id [])] + (mapcat + (fn [ft] + (if (= stream-id (. ft stream)) + [(vec (. ft values))])) + fixed-tuples) + )) + ([results component-id] + (read-tuples results component-id Utils/DEFAULT_STREAM_ID))) + +(defn ms= + [& args] + (apply = (map multi-set args))) + +(def TRACKER-BOLT-ID "+++tracker-bolt") + +;; TODO: should override system-topology! and wrap everything there +(defn mk-tracked-topology + ([tracked-cluster topology] + (let [track-id (::track-id tracked-cluster) + ret (.deepCopy topology)] + (dofor [[_ bolt] (.get_bolts ret) + :let [obj (deserialized-component-object (.get_bolt_object bolt))]] + (.set_bolt_object bolt (serialize-component-object + (BoltTracker. obj track-id)))) + (dofor [[_ spout] (.get_spouts ret) + :let [obj (deserialized-component-object (.get_spout_object spout))]] + (.set_spout_object spout (serialize-component-object + (SpoutTracker. obj track-id)))) + {:topology ret + :last-spout-emit (atom 0) + :cluster tracked-cluster}))) + +(defn assoc-track-id + [cluster track-id] + (assoc cluster ::track-id track-id)) + +(defn increment-global! + [id key amt] + (-> (RegisteredGlobalState/getState id) + (get key) + (.addAndGet amt))) + +(defn global-amt + [id key] + (-> (RegisteredGlobalState/getState id) + (get key) + .get)) + +(defmacro with-tracked-cluster + [[cluster-sym & cluster-args] & body] + `(let [id# (uuid)] + (RegisteredGlobalState/setState + id# + (doto (ConcurrentHashMap.) + (.put "spout-emitted" (AtomicInteger. 0)) + (.put "transferred" (AtomicInteger. 0)) + (.put "processed" (AtomicInteger. 0)))) + (with-var-roots + [acker/mk-acker-bolt + (let [old# acker/mk-acker-bolt] + (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#))) + ;; critical that this particular function is overridden here, + ;; since the transferred stat needs to be incremented at the moment + ;; of tuple emission (and not on a separate thread later) for + ;; topologies to be tracked correctly. This is because "transferred" *must* + ;; be incremented before "processing". + executor/mk-executor-transfer-fn + (let [old# executor/mk-executor-transfer-fn] + (fn [& args#] + (let [transferrer# (apply old# args#)] + (fn [& args2#] + ;; (log-message "Transferring: " transfer-args#) + (increment-global! id# "transferred" 1) + (apply transferrer# args2#)))))] + (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args] + (let [~cluster-sym (assoc-track-id ~cluster-sym id#)] + ~@body))) + (RegisteredGlobalState/clearState id#))) + +(defn tracked-wait + "Waits until topology is idle and 'amt' more tuples have been emitted by spouts." + ([tracked-topology] + (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS)) + ([tracked-topology amt] + (tracked-wait tracked-topology amt TEST-TIMEOUT-MS)) + ([tracked-topology amt timeout-ms] + (let [target (+ amt @(:last-spout-emit tracked-topology)) + track-id (-> tracked-topology :cluster ::track-id) + waiting? (fn [] + (or (not= target (global-amt track-id "spout-emitted")) + (not= (global-amt track-id "transferred") + (global-amt track-id "processed"))))] + (while-timeout timeout-ms (waiting?) + ;; (println "Spout emitted: " (global-amt track-id "spout-emitted")) + ;; (println "Processed: " (global-amt track-id "processed")) + ;; (println "Transferred: " (global-amt track-id "transferred")) + (Thread/sleep (rand-int 200))) + (reset! (:last-spout-emit tracked-topology) target)))) + +(defnk test-tuple + [values + :stream Utils/DEFAULT_STREAM_ID + :component "component" + :fields nil] + (let [fields (or fields + (->> (iterate inc 1) + (take (count values)) + (map #(str "field" %)))) + spout-spec (mk-spout-spec* (TestWordSpout.) + {stream fields}) + topology (StormTopology. {component spout-spec} {} {}) + context (TopologyContext. + topology + (read-storm-config) + {(int 1) component} + {component [(int 1)]} + {component {stream (Fields. fields)}} + "test-storm-id" + nil + nil + (int 1) + nil + [(int 1)] + {} + {} + (HashMap.) + (HashMap.) + (atom false))] + (TupleImpl. context values 1 stream))) + +(defmacro with-timeout + [millis unit & body] + `(let [f# (future ~@body)] + (try + (.get f# ~millis ~unit) + (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing4j.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj b/storm-core/src/clj/org/apache/storm/testing4j.clj new file mode 100644 index 0000000..5850262 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/testing4j.clj @@ -0,0 +1,184 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.testing4j + (:import [java.util Map List Collection ArrayList]) + (:require [org.apache.storm [LocalCluster :as LocalCluster]]) + (:import [org.apache.storm Config ILocalCluster LocalCluster]) + (:import [org.apache.storm.generated StormTopology]) + (:import [org.apache.storm.daemon nimbus]) + (:import [org.apache.storm.testing TestJob MockedSources TrackedTopology + MkClusterParam CompleteTopologyParam MkTupleParam]) + (:import [org.apache.storm.utils Utils]) + (:use [org.apache.storm testing util log]) + (:gen-class + :name org.apache.storm.Testing + :methods [^:static [completeTopology + [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology + org.apache.storm.testing.CompleteTopologyParam] + java.util.Map] + ^:static [completeTopology + [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology] + java.util.Map] + ^:static [withSimulatedTime [Runnable] void] + ^:static [withLocalCluster [org.apache.storm.testing.TestJob] void] + ^:static [withLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void] + ^:static [getLocalCluster [java.util.Map] org.apache.storm.ILocalCluster] + ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.TestJob] void] + ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void] + ^:static [withTrackedCluster [org.apache.storm.testing.TestJob] void] + ^:static [withTrackedCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void] + ^:static [readTuples [java.util.Map String String] java.util.List] + ^:static [readTuples [java.util.Map String] java.util.List] + ^:static [mkTrackedTopology [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology] org.apache.storm.testing.TrackedTopology] + ^:static [trackedWait [org.apache.storm.testing.TrackedTopology] void] + ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer] void] + ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer Integer] void] + ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer Integer] void] + ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer] void] + ^:static [multiseteq [java.util.Collection java.util.Collection] boolean] + ^:static [multiseteq [java.util.Map java.util.Map] boolean] + ^:static [testTuple [java.util.List] org.apache.storm.tuple.Tuple] + ^:static [testTuple [java.util.List org.apache.storm.testing.MkTupleParam] org.apache.storm.tuple.Tuple]])) + +(defn -completeTopology + ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam] + (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {}) + storm-conf (or (.getStormConf completeTopologyParam) {}) + cleanup-state (or (.getCleanupState completeTopologyParam) true) + topology-name (.getTopologyName completeTopologyParam) + timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)] + (complete-topology (.getState cluster) topology + :mock-sources mocked-sources + :storm-conf storm-conf + :cleanup-state cleanup-state + :topology-name topology-name + :timeout-ms timeout-ms))) + ([^ILocalCluster cluster ^StormTopology topology] + (-completeTopology cluster topology (CompleteTopologyParam.)))) + + +(defn -withSimulatedTime + [^Runnable code] + (with-simulated-time + (.run code))) + +(defmacro with-cluster + [cluster-type mkClusterParam code] + `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2) + ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3) + daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})] + (~cluster-type [cluster# :supervisors supervisors# + :ports-per-supervisor ports-per-supervisor# + :daemon-conf daemon-conf#] + (let [cluster# (LocalCluster. cluster#)] + (.run ~code cluster#))))) + +(defn -withLocalCluster + ([^MkClusterParam mkClusterParam ^TestJob code] + (with-cluster with-local-cluster mkClusterParam code)) + ([^TestJob code] + (-withLocalCluster (MkClusterParam.) code))) + +(defn -getLocalCluster + ([^Map clusterConf] + (let [daemon-conf (get-in clusterConf ["daemon-conf"] {}) + supervisors (get-in clusterConf ["supervisors"] 2) + ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3) + inimbus (get-in clusterConf ["inimbus"] nil) + supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024) + nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false) + local-cluster-map (mk-local-storm-cluster :supervisors supervisors + :ports-per-supervisor ports-per-supervisor + :daemon-conf daemon-conf + :inimbus inimbus + :supervisor-slot-port-min supervisor-slot-port-min + :nimbus-daemon nimbus-daemon + )] + (LocalCluster. local-cluster-map)))) + +(defn -withSimulatedTimeLocalCluster + ([^MkClusterParam mkClusterParam ^TestJob code] + (with-cluster with-simulated-time-local-cluster mkClusterParam code)) + ([^TestJob code] + (-withSimulatedTimeLocalCluster (MkClusterParam.) code))) + +(defn -withTrackedCluster + ([^MkClusterParam mkClusterParam ^TestJob code] + (with-cluster with-tracked-cluster mkClusterParam code)) + ([^TestJob code] + (-withTrackedCluster (MkClusterParam.) code))) + +(defn- find-tuples + [^List fixed-tuples ^String stream] + (let [ret (ArrayList.)] + (doseq [fixed-tuple fixed-tuples] + (if (= (.stream fixed-tuple) stream) + (.add ret (.values fixed-tuple)))) + ret)) + +(defn -readTuples + ([^Map result ^String componentId ^String streamId] + (let [stream-result (.get result componentId) + ret (if stream-result + (find-tuples stream-result streamId) + [])] + ret)) + ([^Map result ^String componentId] + (-readTuples result componentId Utils/DEFAULT_STREAM_ID))) + +(defn -mkTrackedTopology + [^ILocalCluster trackedCluster ^StormTopology topology] + (-> (mk-tracked-topology (.getState trackedCluster) topology) + (TrackedTopology.))) + +(defn -trackedWait + ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms] + (tracked-wait trackedTopology amt timeout-ms)) + ([^TrackedTopology trackedTopology ^Integer amt] + (tracked-wait trackedTopology amt)) + ([^TrackedTopology trackedTopology] + (-trackedWait trackedTopology 1))) + +(defn -advanceClusterTime + ([^ILocalCluster cluster ^Integer secs ^Integer step] + (advance-cluster-time (.getState cluster) secs step)) + ([^ILocalCluster cluster ^Integer secs] + (-advanceClusterTime cluster secs 1))) + +(defn- multiseteq + [^Object obj1 ^Object obj2] + (let [obj1 (clojurify-structure obj1) + obj2 (clojurify-structure obj2)] + (ms= obj1 obj2))) + +(defn -multiseteq + [^Collection coll1 ^Collection coll2] + (multiseteq coll1 coll2)) + +(defn -multiseteq + [^Map coll1 ^Map coll2] + (multiseteq coll1 coll2)) + +(defn -testTuple + ([^List values] + (-testTuple values nil)) + ([^List values ^MkTupleParam param] + (if (nil? param) + (test-tuple values) + (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID) + component (or (.getComponent param) "component") + fields (.getFields param)] + (test-tuple values :stream stream :component component :fields fields))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj new file mode 100644 index 0000000..47e233a --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/thrift.clj @@ -0,0 +1,284 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.thrift + (:import [java.util HashMap] + [java.io Serializable] + [org.apache.storm.generated NodeInfo Assignment]) + (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology + StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface + ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo + GlobalStreamId ComponentObject ComponentObject$_Fields + ShellComponent SupervisorInfo]) + (:import [org.apache.storm.utils Utils NimbusClient]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.security.auth ReqContext]) + (:import [org.apache.storm.grouping CustomStreamGrouping]) + (:import [org.apache.storm.topology TopologyBuilder]) + (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) + (:import [org.apache.thrift.transport TTransport]) + (:use [org.apache.storm util config log zookeeper])) + +(defn instantiate-java-object + [^JavaObject obj] + (let [name (symbol (.get_full_class_name obj)) + args (map (memfn getFieldValue) (.get_args_list obj))] + (eval `(new ~name ~@args)))) + +(def grouping-constants + {Grouping$_Fields/FIELDS :fields + Grouping$_Fields/SHUFFLE :shuffle + Grouping$_Fields/ALL :all + Grouping$_Fields/NONE :none + Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized + Grouping$_Fields/CUSTOM_OBJECT :custom-object + Grouping$_Fields/DIRECT :direct + Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) + +(defn grouping-type + [^Grouping grouping] + (grouping-constants (.getSetField grouping))) + +(defn field-grouping + [^Grouping grouping] + (when-not (= (grouping-type grouping) :fields) + (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping"))) + (.get_fields grouping)) + +(defn global-grouping? + [^Grouping grouping] + (and (= :fields (grouping-type grouping)) + (empty? (field-grouping grouping)))) + +(defn parallelism-hint + [^ComponentCommon component-common] + (let [phint (.get_parallelism_hint component-common)] + (if-not (.is_set_parallelism_hint component-common) 1 phint))) + +(defn nimbus-client-and-conn + ([host port] + (nimbus-client-and-conn host port nil)) + ([host port as-user] + (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) + (let [conf (read-storm-config) + nimbusClient (NimbusClient. conf host port nil as-user) + client (.getClient nimbusClient) + transport (.transport nimbusClient)] + [client transport] ))) + +(defmacro with-nimbus-connection + [[client-sym host port] & body] + `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] + (try + ~@body + (finally (.close conn#))))) + +(defmacro with-configured-nimbus-connection + [client-sym & body] + `(let [conf# (read-storm-config) + context# (ReqContext/context) + user# (if (.principal context#) (.getName (.principal context#))) + nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) + ] + (try + ~@body + (finally (.close conn#))))) + +(defn direct-output-fields + [fields] + (StreamInfo. fields true)) + +(defn output-fields + [fields] + (StreamInfo. fields false)) + +(defn mk-output-spec + [output-spec] + (let [output-spec (if (map? output-spec) + output-spec + {Utils/DEFAULT_STREAM_ID output-spec})] + (map-val + (fn [out] + (if (instance? StreamInfo out) + out + (StreamInfo. out false))) + output-spec))) + +(defnk mk-plain-component-common + [inputs output-spec parallelism-hint :conf nil] + (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))] + (when parallelism-hint + (.set_parallelism_hint ret parallelism-hint)) + (when conf + (.set_json_conf ret (to-json conf))) + ret)) + +(defnk mk-spout-spec* + [spout outputs :p nil :conf nil] + (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout)) + (mk-plain-component-common {} outputs p :conf conf))) + +(defn mk-shuffle-grouping + [] + (Grouping/shuffle (NullStruct.))) + +(defn mk-local-or-shuffle-grouping + [] + (Grouping/local_or_shuffle (NullStruct.))) + +(defn mk-fields-grouping + [fields] + (Grouping/fields fields)) + +(defn mk-global-grouping + [] + (mk-fields-grouping [])) + +(defn mk-direct-grouping + [] + (Grouping/direct (NullStruct.))) + +(defn mk-all-grouping + [] + (Grouping/all (NullStruct.))) + +(defn mk-none-grouping + [] + (Grouping/none (NullStruct.))) + +(defn deserialized-component-object + [^ComponentObject obj] + (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA) + (throw (RuntimeException. "Cannot deserialize non-java-serialized object"))) + (Utils/javaDeserialize (.get_serialized_java obj) Serializable)) + +(defn serialize-component-object + [obj] + (ComponentObject/serialized_java (Utils/javaSerialize obj))) + +(defn- mk-grouping + [grouping-spec] + (cond (nil? grouping-spec) + (mk-none-grouping) + + (instance? Grouping grouping-spec) + grouping-spec + + (instance? CustomStreamGrouping grouping-spec) + (Grouping/custom_serialized (Utils/javaSerialize grouping-spec)) + + (instance? JavaObject grouping-spec) + (Grouping/custom_object grouping-spec) + + (sequential? grouping-spec) + (mk-fields-grouping grouping-spec) + + (= grouping-spec :shuffle) + (mk-shuffle-grouping) + + (= grouping-spec :local-or-shuffle) + (mk-local-or-shuffle-grouping) + (= grouping-spec :none) + (mk-none-grouping) + + (= grouping-spec :all) + (mk-all-grouping) + + (= grouping-spec :global) + (mk-global-grouping) + + (= grouping-spec :direct) + (mk-direct-grouping) + + true + (throw (IllegalArgumentException. + (str grouping-spec " is not a valid grouping"))))) + +(defn- mk-inputs + [inputs] + (into {} (for [[stream-id grouping-spec] inputs] + [(if (sequential? stream-id) + (GlobalStreamId. (first stream-id) (second stream-id)) + (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID)) + (mk-grouping grouping-spec)]))) + +(defnk mk-bolt-spec* + [inputs bolt outputs :p nil :conf nil] + (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)] + (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt)) + common))) + +(defnk mk-spout-spec + [spout :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj spout :p parallelism-hint :conf conf})) + +(defn- shell-component-params + [command script-or-output-spec kwargs] + (if (string? script-or-output-spec) + [(into-array String [command script-or-output-spec]) + (first kwargs) + (rest kwargs)] + [(into-array String command) + script-or-output-spec + kwargs])) + +(defnk mk-bolt-spec + [inputs bolt :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj bolt :inputs inputs :p parallelism-hint :conf conf})) + +(defn mk-shell-bolt-spec + [inputs command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-bolt-spec inputs + (RichShellBolt. command (mk-output-spec output-spec)) kwargs))) + +(defn mk-shell-spout-spec + [command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-spout-spec + (RichShellSpout. command (mk-output-spec output-spec)) kwargs))) + +(defn- add-inputs + [declarer inputs] + (doseq [[id grouping] (mk-inputs inputs)] + (.grouping declarer id grouping))) + +(defn mk-topology + ([spout-map bolt-map] + (let [builder (TopologyBuilder.)] + (doseq [[name {spout :obj p :p conf :conf}] spout-map] + (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf))) + (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map] + (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs))) + (.createTopology builder))) + ([spout-map bolt-map state-spout-map] + (mk-topology spout-map bolt-map))) + +;; clojurify-structure is needed or else every element becomes the same after successive calls +;; don't know why this happens +(def STORM-TOPOLOGY-FIELDS + (-> StormTopology/metaDataMap clojurify-structure keys)) + +(def SPOUT-FIELDS + [StormTopology$_Fields/SPOUTS + StormTopology$_Fields/STATE_SPOUTS]) + http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/timer.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj new file mode 100644 index 0000000..0d8839e --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/timer.clj @@ -0,0 +1,128 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.timer + (:import [org.apache.storm.utils Time]) + (:import [java.util PriorityQueue Comparator Random]) + (:import [java.util.concurrent Semaphore]) + (:use [org.apache.storm util log])) + +;; The timer defined in this file is very similar to java.util.Timer, except +;; it integrates with Storm's time simulation capabilities. This lets us test +;; code that does asynchronous work on the timer thread + +(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil] + (let [queue (PriorityQueue. 10 (reify Comparator + (compare + [this o1 o2] + (- (first o1) (first o2))) + (equals + [this obj] + true))) + active (atom true) + lock (Object.) + notifier (Semaphore. 0) + thread-name (if timer-name timer-name "timer") + timer-thread (Thread. + (fn [] + (while @active + (try + (let [[time-millis _ _ :as elem] (locking lock (.peek queue))] + (if (and elem (>= (current-time-millis) time-millis)) + ;; It is imperative to not run the function + ;; inside the timer lock. Otherwise, it is + ;; possible to deadlock if the fn deals with + ;; other locks, like the submit lock. + (let [afn (locking lock (second (.poll queue)))] + (afn)) + (if time-millis + ;; If any events are scheduled, sleep until + ;; event generation. If any recurring events + ;; are scheduled then we will always go + ;; through this branch, sleeping only the + ;; exact necessary amount of time. We give + ;; an upper bound, e.g. 1000 millis, to the + ;; sleeping time, to limit the response time + ;; for detecting any new event within 1 secs. + (Time/sleep (min 1000 (- time-millis (current-time-millis)))) + ;; Otherwise poll to see if any new event + ;; was scheduled. This is, in essence, the + ;; response time for detecting any new event + ;; schedulings when there are no scheduled + ;; events. + (Time/sleep 1000)))) + (catch Throwable t + ;; Because the interrupted exception can be + ;; wrapped in a RuntimeException. + (when-not (exception-cause? InterruptedException t) + (kill-fn t) + (reset! active false) + (throw t))))) + (.release notifier)) thread-name)] + (.setDaemon timer-thread true) + (.setPriority timer-thread Thread/MAX_PRIORITY) + (.start timer-thread) + {:timer-thread timer-thread + :queue queue + :active active + :lock lock + :random (Random.) + :cancel-notifier notifier})) + +(defn- check-active! + [timer] + (when-not @(:active timer) + (throw (IllegalStateException. "Timer is not active")))) + +(defnk schedule + [timer delay-secs afn :check-active true :jitter-ms 0] + (when check-active (check-active! timer)) + (let [id (uuid) + ^PriorityQueue queue (:queue timer) + end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs)) + end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)] + (locking (:lock timer) + (.add queue [end-time-ms afn id])))) + +(defn schedule-recurring + [timer delay-secs recur-secs afn] + (schedule timer + delay-secs + (fn this [] + (afn) + ; This avoids a race condition with cancel-timer. + (schedule timer recur-secs this :check-active false)))) + +(defn schedule-recurring-with-jitter + [timer delay-secs recur-secs jitter-ms afn] + (schedule timer + delay-secs + (fn this [] + (afn) + ; This avoids a race condition with cancel-timer. + (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms)))) + +(defn cancel-timer + [timer] + (check-active! timer) + (locking (:lock timer) + (reset! (:active timer) false) + (.interrupt (:timer-thread timer))) + (.acquire (:cancel-notifier timer))) + +(defn timer-waiting? + [timer] + (Time/isThreadWaiting (:timer-thread timer))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/trident/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj new file mode 100644 index 0000000..44e5ca9 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj @@ -0,0 +1,79 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.trident.testing + (:require [org.apache.storm.LocalDRPC :as LocalDRPC]) + (:import [org.apache.storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs]) + (:require [org.apache.storm [LocalDRPC]]) + (:import [org.apache.storm LocalDRPC]) + (:import [org.apache.storm.tuple Fields]) + (:import [org.apache.storm.generated KillOptions]) + (:require [org.apache.storm [testing :as t]]) + (:use [org.apache.storm util]) + ) + +(defn local-drpc [] + (LocalDRPC.)) + +(defn exec-drpc [^LocalDRPC drpc function-name args] + (let [res (.execute drpc function-name args)] + (from-json res))) + +(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples] + (exec-drpc drpc function-name (to-json tuples))) + +(defn feeder-spout [fields] + (FeederBatchSpout. fields)) + +(defn feeder-committer-spout [fields] + (FeederCommitterBatchSpout. fields)) + +(defn feed [feeder tuples] + (.feed feeder tuples)) + +(defn fields [& fields] + (Fields. fields)) + +(defn memory-map-state [] + (MemoryMapState$Factory.)) + +(defmacro with-drpc [[drpc] & body] + `(let [~drpc (org.apache.storm.LocalDRPC.)] + ~@body + (.shutdown ~drpc) + )) + +(defn with-topology* [cluster topo body-fn] + (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo)) + (body-fn) + (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))) + ) + +(defmacro with-topology [[cluster topo] & body] + `(with-topology* ~cluster ~topo (fn [] ~@body))) + +(defn bootstrap-imports [] + (import 'org.apache.storm.LocalDRPC) + (import 'org.apache.storm.trident.TridentTopology) + (import '[org.apache.storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet]) + ) + +(defn drpc-tuples-input [topology function-name drpc outfields] + (-> topology + (.newDRPCStream function-name drpc) + (.each (fields "args") (TuplifyArgs.) outfields) + )) + +
