http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/util.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj deleted file mode 100644 index 75ea0dc..0000000 --- a/storm-core/src/clj/backtype/storm/util.clj +++ /dev/null @@ -1,1134 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.util - (:import [java.net InetAddress]) - (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap]) - (:import [java.io FileReader FileNotFoundException]) - (:import [java.nio.file Paths]) - (:import [org.apache.storm Config]) - (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils - MutableObject MutableInt]) - (:import [org.apache.storm.security.auth NimbusPrincipal]) - (:import [javax.security.auth Subject]) - (:import [java.util UUID Random ArrayList List Collections]) - (:import [java.util.zip ZipFile]) - (:import [java.util.concurrent.locks ReentrantReadWriteLock]) - (:import [java.util.concurrent Semaphore]) - (:import [java.nio.file Files Paths]) - (:import [java.nio.file.attribute FileAttribute]) - (:import [java.io File FileOutputStream RandomAccessFile StringWriter - PrintWriter BufferedReader InputStreamReader IOException]) - (:import [java.lang.management ManagementFactory]) - (:import [org.apache.commons.exec DefaultExecutor CommandLine]) - (:import [org.apache.commons.io FileUtils]) - (:import [org.apache.storm.logging ThriftAccessLogger]) - (:import [org.apache.commons.exec ExecuteException]) - (:import [org.json.simple JSONValue]) - (:import [org.yaml.snakeyaml Yaml] - [org.yaml.snakeyaml.constructor SafeConstructor]) - (:require [clojure [string :as str]]) - (:import [clojure.lang RT]) - (:require [clojure [set :as set]]) - (:require [clojure.java.io :as io]) - (:use [clojure walk]) - (:require [ring.util.codec :as codec]) - (:use [org.apache.storm log])) - -(defn wrap-in-runtime - "Wraps an exception in a RuntimeException if needed" - [^Exception e] - (if (instance? RuntimeException e) - e - (RuntimeException. e))) - -(def on-windows? - (= "Windows_NT" (System/getenv "OS"))) - -(def file-path-separator - (System/getProperty "file.separator")) - -(def class-path-separator - (System/getProperty "path.separator")) - -(defn is-absolute-path? [path] - (.isAbsolute (Paths/get path (into-array String [])))) - -(defmacro defalias - "Defines an alias for a var: a new var with the same root binding (if - any) and similar metadata. The metadata of the alias is its initial - metadata (as provided by def) merged into the metadata of the original." - ([name orig] - `(do - (alter-meta! - (if (.hasRoot (var ~orig)) - (def ~name (.getRawRoot (var ~orig))) - (def ~name)) - ;; When copying metadata, disregard {:macro false}. - ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273 - #(conj (dissoc % :macro) - (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %))))) - (var ~name))) - ([name orig doc] - (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig))) - -;; name-with-attributes by Konrad Hinsen: -(defn name-with-attributes - "To be used in macro definitions. - Handles optional docstrings and attribute maps for a name to be defined - in a list of macro arguments. If the first macro argument is a string, - it is added as a docstring to name and removed from the macro argument - list. If afterwards the first macro argument is a map, its entries are - added to the name's metadata map and the map is removed from the - macro argument list. The return value is a vector containing the name - with its extended metadata map and the list of unprocessed macro - arguments." - [name macro-args] - (let [[docstring macro-args] (if (string? (first macro-args)) - [(first macro-args) (next macro-args)] - [nil macro-args]) - [attr macro-args] (if (map? (first macro-args)) - [(first macro-args) (next macro-args)] - [{} macro-args]) - attr (if docstring - (assoc attr :doc docstring) - attr) - attr (if (meta name) - (conj (meta name) attr) - attr)] - [(with-meta name attr) macro-args])) - -(defmacro defnk - "Define a function accepting keyword arguments. Symbols up to the first - keyword in the parameter list are taken as positional arguments. Then - an alternating sequence of keywords and defaults values is expected. The - values of the keyword arguments are available in the function body by - virtue of the symbol corresponding to the keyword (cf. :keys destructuring). - defnk accepts an optional docstring as well as an optional metadata map." - [fn-name & fn-tail] - (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail) - [pos kw-vals] (split-with symbol? args) - syms (map #(-> % name symbol) (take-nth 2 kw-vals)) - values (take-nth 2 (rest kw-vals)) - sym-vals (apply hash-map (interleave syms values)) - de-map {:keys (vec syms) :or sym-vals}] - `(defn ~fn-name - [~@pos & options#] - (let [~de-map (apply hash-map options#)] - ~@body)))) - -(defn find-first - "Returns the first item of coll for which (pred item) returns logical true. - Consumes sequences up to the first match, will consume the entire sequence - and return nil if no match is found." - [pred coll] - (first (filter pred coll))) - -(defn dissoc-in - "Dissociates an entry from a nested associative structure returning a new - nested structure. keys is a sequence of keys. Any empty maps that result - will not be present in the new structure." - [m [k & ks :as keys]] - (if ks - (if-let [nextmap (get m k)] - (let [newmap (dissoc-in nextmap ks)] - (if (seq newmap) - (assoc m k newmap) - (dissoc m k))) - m) - (dissoc m k))) - -(defn indexed - "Returns a lazy sequence of [index, item] pairs, where items come - from 's' and indexes count up from zero. - - (indexed '(a b c d)) => ([0 a] [1 b] [2 c] [3 d])" - [s] - (map vector (iterate inc 0) s)) - -(defn positions - "Returns a lazy sequence containing the positions at which pred - is true for items in coll." - [pred coll] - (for [[idx elt] (indexed coll) :when (pred elt)] idx)) - -(defn exception-cause? - [klass ^Throwable t] - (->> (iterate #(.getCause ^Throwable %) t) - (take-while identity) - (some (partial instance? klass)) - boolean)) - -(defmacro thrown-cause? - [klass & body] - `(try - ~@body - false - (catch Throwable t# - (exception-cause? ~klass t#)))) - -(defmacro thrown-cause-with-msg? - [klass re & body] - `(try - ~@body - false - (catch Throwable t# - (and (re-matches ~re (.getMessage t#)) - (exception-cause? ~klass t#))))) - -(defmacro forcat - [[args aseq] & body] - `(mapcat (fn [~args] - ~@body) - ~aseq)) - -(defmacro try-cause - [& body] - (let [checker (fn [form] - (or (not (sequential? form)) - (not= 'catch (first form)))) - [code guards] (split-with checker body) - error-local (gensym "t") - guards (forcat [[_ klass local & guard-body] guards] - `((exception-cause? ~klass ~error-local) - (let [~local ~error-local] - ~@guard-body - )))] - `(try ~@code - (catch Throwable ~error-local - (cond ~@guards - true (throw ~error-local) - ))))) - -(defn local-hostname - [] - (.getCanonicalHostName (InetAddress/getLocalHost))) - -(def memoized-local-hostname (memoize local-hostname)) - -;; checks conf for STORM_LOCAL_HOSTNAME. -;; when unconfigured, falls back to (memoized) guess by `local-hostname`. -(defn hostname - [conf] - (conf Config/STORM_LOCAL_HOSTNAME (memoized-local-hostname))) - -(letfn [(try-port [port] - (with-open [socket (java.net.ServerSocket. port)] - (.getLocalPort socket)))] - (defn available-port - ([] (try-port 0)) - ([preferred] - (try - (try-port preferred) - (catch java.io.IOException e - (available-port)))))) - -(defn uuid [] - (str (UUID/randomUUID))) - -(defn current-time-secs - [] - (Time/currentTimeSecs)) - -(defn current-time-millis - [] - (Time/currentTimeMillis)) - -(defn secs-to-millis-long - [secs] - (long (* (long 1000) secs))) - -(defn clojurify-structure - [s] - (prewalk (fn [x] - (cond (instance? Map x) (into {} x) - (instance? List x) (vec x) - ;; (Boolean. false) does not evaluate to false in an if. - ;; This fixes that. - (instance? Boolean x) (boolean x) - true x)) - s)) - -(defmacro with-file-lock - [path & body] - `(let [f# (File. ~path) - _# (.createNewFile f#) - rf# (RandomAccessFile. f# "rw") - lock# (.. rf# (getChannel) (lock))] - (try - ~@body - (finally - (.release lock#) - (.close rf#))))) - -(defn tokenize-path - [^String path] - (let [toks (.split path "/")] - (vec (filter (complement empty?) toks)))) - -(defn assoc-conj - [m k v] - (merge-with concat m {k [v]})) - -;; returns [ones in first set not in second, ones in second set not in first] -(defn set-delta - [old curr] - (let [s1 (set old) - s2 (set curr)] - [(set/difference s1 s2) (set/difference s2 s1)])) - -(defn parent-path - [path] - (let [toks (tokenize-path path)] - (str "/" (str/join "/" (butlast toks))))) - -(defn toks->path - [toks] - (str "/" (str/join "/" toks))) - -(defn normalize-path - [^String path] - (toks->path (tokenize-path path))) - -(defn map-val - [afn amap] - (into {} - (for [[k v] amap] - [k (afn v)]))) - -(defn filter-val - [afn amap] - (into {} (filter (fn [[k v]] (afn v)) amap))) - -(defn filter-key - [afn amap] - (into {} (filter (fn [[k v]] (afn k)) amap))) - -(defn map-key - [afn amap] - (into {} (for [[k v] amap] [(afn k) v]))) - -(defn separate - [pred aseq] - [(filter pred aseq) (filter (complement pred) aseq)]) - -(defn full-path - [parent name] - (let [toks (tokenize-path parent)] - (toks->path (conj toks name)))) - -(def not-nil? (complement nil?)) - -(defn barr - [& vals] - (byte-array (map byte vals))) - -(defn exit-process! - [val & msg] - (log-error (RuntimeException. (str msg)) "Halting process: " msg) - (.exit (Runtime/getRuntime) val)) - -(defn sum - [vals] - (reduce + vals)) - -(defn repeat-seq - ([aseq] - (apply concat (repeat aseq))) - ([amt aseq] - (apply concat (repeat amt aseq)))) - -(defn div - "Perform floating point division on the arguments." - [f & rest] - (apply / (double f) rest)) - -(defn defaulted - [val default] - (if val val default)) - -(defn mk-counter - ([] (mk-counter 1)) - ([start-val] - (let [val (atom (dec start-val))] - (fn [] (swap! val inc))))) - -(defmacro for-times [times & body] - `(for [i# (range ~times)] - ~@body)) - -(defmacro dofor [& body] - `(doall (for ~@body))) - -(defn reverse-map - "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}" - [amap] - (reduce (fn [m [k v]] - (let [existing (get m v [])] - (assoc m v (conj existing k)))) - {} amap)) - -(defmacro print-vars [& vars] - (let [prints (for [v vars] `(println ~(str v) ~v))] - `(do ~@prints))) - -(defn process-pid - "Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this." - [] - (let [name (.getName (ManagementFactory/getRuntimeMXBean)) - split (.split name "@")] - (when-not (= 2 (count split)) - (throw (RuntimeException. (str "Got unexpected process name: " name)))) - (first split))) - -(defn exec-command! [command] - (let [[comm-str & args] (seq (.split command " ")) - command (CommandLine. comm-str)] - (doseq [a args] - (.addArgument command a)) - (.execute (DefaultExecutor.) command))) - -(defn extract-dir-from-jar [jarpath dir destdir] - (try-cause - (with-open [jarpath (ZipFile. jarpath)] - (let [entries (enumeration-seq (.entries jarpath))] - (doseq [file (filter (fn [entry](and (not (.isDirectory entry)) (.startsWith (.getName entry) dir))) entries)] - (.mkdirs (.getParentFile (File. destdir (.getName file)))) - (with-open [out (FileOutputStream. (File. destdir (.getName file)))] - (io/copy (.getInputStream jarpath file) out))))) - (catch IOException e - (log-message "Could not extract " dir " from " jarpath)))) - -(defn sleep-secs [secs] - (when (pos? secs) - (Time/sleep (* (long secs) 1000)))) - -(defn sleep-until-secs [target-secs] - (Time/sleepUntil (* (long target-secs) 1000))) - -(def ^:const sig-kill 9) - -(def ^:const sig-term 15) - -(defn send-signal-to-process - [pid signum] - (try-cause - (exec-command! (str (if on-windows? - (if (== signum sig-kill) "taskkill /f /pid " "taskkill /pid ") - (str "kill -" signum " ")) - pid)) - (catch ExecuteException e - (log-message "Error when trying to kill " pid ". Process is probably already dead.")))) - -(defn read-and-log-stream - [prefix stream] - (try - (let [reader (BufferedReader. (InputStreamReader. stream))] - (loop [] - (if-let [line (.readLine reader)] - (do - (log-warn (str prefix ":" line)) - (recur))))) - (catch IOException e - (log-warn "Error while trying to log stream" e)))) - -(defn force-kill-process - [pid] - (send-signal-to-process pid sig-kill)) - -(defn kill-process-with-sig-term - [pid] - (send-signal-to-process pid sig-term)) - -(defn add-shutdown-hook-with-force-kill-in-1-sec - "adds the user supplied function as a shutdown hook for cleanup. - Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case - cleanup function hangs." - [func] - (.addShutdownHook (Runtime/getRuntime) (Thread. #(func))) - (.addShutdownHook (Runtime/getRuntime) (Thread. #((sleep-secs 1) - (.halt (Runtime/getRuntime) 20))))) - -(defprotocol SmartThread - (start [this]) - (join [this]) - (interrupt [this]) - (sleeping? [this])) - -;; afn returns amount of time to sleep -(defnk async-loop [afn - :daemon false - :kill-fn (fn [error] (exit-process! 1 "Async loop died!")) - :priority Thread/NORM_PRIORITY - :factory? false - :start true - :thread-name nil] - (let [thread (Thread. - (fn [] - (try-cause - (let [afn (if factory? (afn) afn)] - (loop [] - (let [sleep-time (afn)] - (when-not (nil? sleep-time) - (sleep-secs sleep-time) - (recur)) - ))) - (catch InterruptedException e - (log-message "Async loop interrupted!") - ) - (catch Throwable t - (log-error t "Async loop died!") - (kill-fn t)))))] - (.setDaemon thread daemon) - (.setPriority thread priority) - (when-not (clojure.string/blank? thread-name) - ;; if thead-name is blank, just use the default thread name - (.setName thread (str (.getName thread) "-" thread-name))) - (when start - (.start thread)) - ;; should return object that supports stop, interrupt, join, and waiting? - (reify SmartThread - (start - [this] - (.start thread)) - (join - [this] - (.join thread)) - (interrupt - [this] - (.interrupt thread)) - (sleeping? - [this] - (Time/isThreadWaiting thread))))) - -(defn shell-cmd - [command] - (->> command - (map #(str \' (clojure.string/escape % {\' "'\"'\"'"}) \')) - (clojure.string/join " "))) - -(defn script-file-path [dir] - (str dir file-path-separator "storm-worker-script.sh")) - -(defn container-file-path [dir] - (str dir file-path-separator "launch_container.sh")) - -(defnk write-script - [dir command :environment {}] - (let [script-src (str "#!/bin/bash\n" (clojure.string/join "" (map (fn [[k v]] (str (shell-cmd ["export" (str k "=" v)]) ";\n")) environment)) "\nexec " (shell-cmd command) ";") - script-path (script-file-path dir) - _ (spit script-path script-src)] - script-path - )) - -(defnk launch-process - [command :environment {} :log-prefix nil :exit-code-callback nil :directory nil] - (let [builder (ProcessBuilder. command) - process-env (.environment builder)] - (when directory (.directory builder directory)) - (.redirectErrorStream builder true) - (doseq [[k v] environment] - (.put process-env k v)) - (let [process (.start builder)] - (if (or log-prefix exit-code-callback) - (async-loop - (fn [] - (if log-prefix - (read-and-log-stream log-prefix (.getInputStream process))) - (when exit-code-callback - (try - (.waitFor process) - (catch InterruptedException e - (log-message log-prefix " interrupted."))) - (exit-code-callback (.exitValue process))) - nil))) - process))) - -(defn exists-file? - [path] - (.exists (File. path))) - -(defn rmr - [path] - (log-debug "Rmr path " path) - (when (exists-file? path) - (try - (FileUtils/forceDelete (File. path)) - (catch FileNotFoundException e)))) - -(defn rmpath - "Removes file or directory at the path. Not recursive. Throws exception on failure" - [path] - (log-debug "Removing path " path) - (when (exists-file? path) - (let [deleted? (.delete (File. path))] - (when-not deleted? - (throw (RuntimeException. (str "Failed to delete " path))))))) - -(defn local-mkdirs - [path] - (log-debug "Making dirs at " path) - (FileUtils/forceMkdir (File. path))) - -(defn touch - [path] - (log-debug "Touching file at " path) - (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path)))) - (.createNewFile (File. path)))] - (when-not success? - (throw (RuntimeException. (str "Failed to touch " path)))))) - -(defn create-symlink! - "Create symlink is to the target" - ([path-dir target-dir file-name] - (create-symlink! path-dir target-dir file-name file-name)) - ([path-dir target-dir from-file-name to-file-name] - (let [path (str path-dir file-path-separator from-file-name) - target (str target-dir file-path-separator to-file-name) - empty-array (make-array String 0) - attrs (make-array FileAttribute 0) - abs-path (.toAbsolutePath (Paths/get path empty-array)) - abs-target (.toAbsolutePath (Paths/get target empty-array))] - (log-debug "Creating symlink [" abs-path "] to [" abs-target "]") - (if (not (.exists (.toFile abs-path))) - (Files/createSymbolicLink abs-path abs-target attrs))))) - -(defn read-dir-contents - [dir] - (if (exists-file? dir) - (let [content-files (.listFiles (File. dir))] - (map #(.getName ^File %) content-files)) - [])) - -(defn compact - [aseq] - (filter (complement nil?) aseq)) - -(defn current-classpath - [] - (System/getProperty "java.class.path")) - -(defn get-full-jars - [dir] - (map #(str dir file-path-separator %) (filter #(.endsWith % ".jar") (read-dir-contents dir)))) - -(defn worker-classpath - [] - (let [storm-dir (System/getProperty "storm.home") - storm-lib-dir (str storm-dir file-path-separator "lib") - storm-conf-dir (if-let [confdir (System/getenv "STORM_CONF_DIR")] - confdir - (str storm-dir file-path-separator "conf")) - storm-extlib-dir (str storm-dir file-path-separator "extlib") - extcp (System/getenv "STORM_EXT_CLASSPATH")] - (if (nil? storm-dir) - (current-classpath) - (str/join class-path-separator - (remove nil? (concat (get-full-jars storm-lib-dir) (get-full-jars storm-extlib-dir) [extcp] [storm-conf-dir])))))) - -(defn add-to-classpath - [classpath paths] - (if (empty? paths) - classpath - (str/join class-path-separator (cons classpath paths)))) - -(defn ^ReentrantReadWriteLock mk-rw-lock - [] - (ReentrantReadWriteLock.)) - -(defmacro read-locked - [rw-lock & body] - (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})] - `(let [rlock# (.readLock ~lock)] - (try (.lock rlock#) - ~@body - (finally (.unlock rlock#)))))) - -(defmacro write-locked - [rw-lock & body] - (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})] - `(let [wlock# (.writeLock ~lock)] - (try (.lock wlock#) - ~@body - (finally (.unlock wlock#)))))) - -(defn time-delta - [time-secs] - (- (current-time-secs) time-secs)) - -(defn time-delta-ms - [time-ms] - (- (System/currentTimeMillis) (long time-ms))) - -(defn parse-int - [str] - (Integer/valueOf str)) - -(defn integer-divided - [sum num-pieces] - (clojurify-structure (Utils/integerDivided sum num-pieces))) - -(defn collectify - [obj] - (if (or (sequential? obj) (instance? Collection obj)) - obj - [obj])) - -(defn to-json - [obj] - (JSONValue/toJSONString obj)) - -(defn from-json - [^String str] - (if str - (clojurify-structure - (JSONValue/parse str)) - nil)) - -(defmacro letlocals - [& body] - (let [[tobind lexpr] (split-at (dec (count body)) body) - binded (vec (mapcat (fn [e] - (if (and (list? e) (= 'bind (first e))) - [(second e) (last e)] - ['_ e] - )) - tobind))] - `(let ~binded - ~(first lexpr)))) - -(defn remove-first - [pred aseq] - (let [[b e] (split-with (complement pred) aseq)] - (when (empty? e) - (throw (IllegalArgumentException. "Nothing to remove"))) - (concat b (rest e)))) - -(defn assoc-non-nil - [m k v] - (if v (assoc m k v) m)) - -(defn multi-set - "Returns a map of elem to count" - [aseq] - (apply merge-with + - (map #(hash-map % 1) aseq))) - -(defn set-var-root* - [avar val] - (alter-var-root avar (fn [avar] val))) - -(defmacro set-var-root - [var-sym val] - `(set-var-root* (var ~var-sym) ~val)) - -(defmacro with-var-roots - [bindings & body] - (let [settings (partition 2 bindings) - tmpvars (repeatedly (count settings) (partial gensym "old")) - vars (map first settings) - savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars)) - setters (for [[v s] settings] `(set-var-root ~v ~s)) - restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)] - `(let ~savevals - ~@setters - (try - ~@body - (finally - ~@restorers))))) - -(defn map-diff - "Returns mappings in m2 that aren't in m1" - [m1 m2] - (into {} (filter (fn [[k v]] (not= v (m1 k))) m2))) - -(defn select-keys-pred - [pred amap] - (into {} (filter (fn [[k v]] (pred k)) amap))) - -(defn rotating-random-range - [choices] - (let [rand (Random.) - choices (ArrayList. choices)] - (Collections/shuffle choices rand) - [(MutableInt. -1) choices rand])) - -(defn acquire-random-range-id - [[^MutableInt curr ^List state ^Random rand]] - (when (>= (.increment curr) (.size state)) - (.set curr 0) - (Collections/shuffle state rand)) - (.get state (.get curr))) - -; this can be rewritten to be tail recursive -(defn interleave-all - [& colls] - (if (empty? colls) - [] - (let [colls (filter (complement empty?) colls) - my-elems (map first colls) - rest-elems (apply interleave-all (map rest colls))] - (concat my-elems rest-elems)))) - -(defn any-intersection - [& sets] - (let [elem->count (multi-set (apply concat sets))] - (-> (filter-val #(> % 1) elem->count) - keys))) - -(defn between? - "val >= lower and val <= upper" - [val lower upper] - (and (>= val lower) - (<= val upper))) - -(defmacro benchmark - [& body] - `(let [l# (doall (range 1000000))] - (time - (doseq [i# l#] - ~@body)))) - -(defn rand-sampler - [freq] - (let [r (java.util.Random.)] - (fn [] (= 0 (.nextInt r freq))))) - -(defn even-sampler - [freq] - (let [freq (int freq) - start (int 0) - r (java.util.Random.) - curr (MutableInt. -1) - target (MutableInt. (.nextInt r freq))] - (with-meta - (fn [] - (let [i (.increment curr)] - (when (>= i freq) - (.set curr start) - (.set target (.nextInt r freq)))) - (= (.get curr) (.get target))) - {:rate freq}))) - -(defn sampler-rate - [sampler] - (:rate (meta sampler))) - -(defn class-selector - [obj & args] - (class obj)) - -(defn uptime-computer [] - (let [start-time (current-time-secs)] - (fn [] (time-delta start-time)))) - -(defn stringify-error [error] - (let [result (StringWriter.) - printer (PrintWriter. result)] - (.printStackTrace error printer) - (.toString result))) - -(defn nil-to-zero - [v] - (or v 0)) - -(defmacro with-error-reaction - [afn & body] - `(try ~@body - (catch Throwable t# (~afn t#)))) - -(defn container - [] - (Container.)) - -(defn container-set! [^Container container obj] - (set! (. container object) obj) - container) - -(defn container-get [^Container container] - (. container object)) - -(defn to-millis [secs] - (* 1000 (long secs))) - -(defn throw-runtime [& strs] - (throw (RuntimeException. (apply str strs)))) - -(defn redirect-stdio-to-slf4j! - [] - ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here - ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes - ;; it might have something to do with being a child process - ;; (set! (. (.getThreadBinding RT/OUT) val) - ;; (java.io.OutputStreamWriter. - ;; (log-stream :info "STDIO"))) - ;; (set! (. (.getThreadBinding RT/ERR) val) - ;; (PrintWriter. - ;; (java.io.OutputStreamWriter. - ;; (log-stream :error "STDIO")) - ;; true)) - (log-capture! "STDIO")) - -(defn spy - [prefix val] - (log-message prefix ": " val) - val) - -(defn zip-contains-dir? - [zipfile target] - (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))] - (boolean (some #(.startsWith % (str target "/")) entries)))) - -(defn url-encode - [s] - (codec/url-encode s)) - -(defn url-decode - [s] - (codec/url-decode s)) - -(defn join-maps - [& maps] - (let [all-keys (apply set/union (for [m maps] (-> m keys set)))] - (into {} (for [k all-keys] - [k (for [m maps] (m k))])))) - -(defn partition-fixed - [max-num-chunks aseq] - (if (zero? max-num-chunks) - [] - (let [chunks (->> (integer-divided (count aseq) max-num-chunks) - (#(dissoc % 0)) - (sort-by (comp - first)) - (mapcat (fn [[size amt]] (repeat amt size))) - )] - (loop [result [] - [chunk & rest-chunks] chunks - data aseq] - (if (nil? chunk) - result - (let [[c rest-data] (split-at chunk data)] - (recur (conj result c) - rest-chunks - rest-data))))))) - - -(defn assoc-apply-self - [curr key afn] - (assoc curr key (afn curr))) - -(defmacro recursive-map - [& forms] - (->> (partition 2 forms) - (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form)))) - (concat `(-> {})))) - -(defn current-stack-trace - [] - (->> (Thread/currentThread) - .getStackTrace - (map str) - (str/join "\n"))) - -(defn get-iterator - [^Iterable alist] - (if alist (.iterator alist))) - -(defn iter-has-next? - [^Iterator iter] - (if iter (.hasNext iter) false)) - -(defn iter-next - [^Iterator iter] - (.next iter)) - -(defmacro fast-list-iter - [pairs & body] - (let [pairs (partition 2 pairs) - lists (map second pairs) - elems (map first pairs) - iters (map (fn [_] (gensym)) lists) - bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists) - (apply concat)) - tests (map (fn [i] `(iter-has-next? ~i)) iters) - assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters) - (apply concat))] - `(let [~@bindings] - (while (and ~@tests) - (let [~@assignments] - ~@body))))) - -(defn fast-list-map - [afn alist] - (let [ret (ArrayList.)] - (fast-list-iter [e alist] - (.add ret (afn e))) - ret)) - -(defmacro fast-list-for - [[e alist] & body] - `(fast-list-map (fn [~e] ~@body) ~alist)) - -(defn map-iter - [^Map amap] - (if amap (-> amap .entrySet .iterator))) - -(defn convert-entry - [^Map$Entry entry] - [(.getKey entry) (.getValue entry)]) - -(defmacro fast-map-iter - [[bind amap] & body] - `(let [iter# (map-iter ~amap)] - (while (iter-has-next? iter#) - (let [entry# (iter-next iter#) - ~bind (convert-entry entry#)] - ~@body)))) - -(defn fast-first - [^List alist] - (.get alist 0)) - -(defmacro get-with-default - [amap key default-val] - `(let [curr# (.get ~amap ~key)] - (if curr# - curr# - (do - (let [new# ~default-val] - (.put ~amap ~key new#) - new#))))) - -(defn fast-group-by - [afn alist] - (let [ret (HashMap.)] - (fast-list-iter - [e alist] - (let [key (afn e) - ^List curr (get-with-default ret key (ArrayList.))] - (.add curr e))) - ret)) - -(defn new-instance - [klass] - (let [klass (if (string? klass) (Class/forName klass) klass)] - (.newInstance klass))) - -(defn get-configured-class - [conf config-key] - (if (.get conf config-key) (new-instance (.get conf config-key)) nil)) - -(defmacro -<> - ([x] x) - ([x form] (if (seq? form) - (with-meta - (let [[begin [_ & end]] (split-with #(not= % '<>) form)] - (concat begin [x] end)) - (meta form)) - (list form x))) - ([x form & more] `(-<> (-<> ~x ~form) ~@more))) - -(defn logs-filename - [storm-id port] - (str storm-id file-path-separator port file-path-separator "worker.log")) - -(def worker-log-filename-pattern #"^worker.log(.*)") - -(defn event-logs-filename - [storm-id port] - (str storm-id file-path-separator port file-path-separator "events.log")) - -(defn clojure-from-yaml-file [yamlFile] - (try - (with-open [reader (java.io.FileReader. yamlFile)] - (clojurify-structure (.load (Yaml. (SafeConstructor.)) reader))) - (catch Exception ex - (log-error ex) {}))) - -(defn hashmap-to-persistent [^HashMap m] - (zipmap (.keySet m) (.values m))) - -(defn retry-on-exception - "Retries specific function on exception based on retries count" - [retries task-description f & args] - (let [res (try {:value (apply f args)} - (catch Exception e - (if (<= 0 retries) - (throw e) - {:exception e})))] - (if (:exception res) - (do - (log-error (:exception res) (str "Failed to " task-description ". Will make [" retries "] more attempts.")) - (recur (dec retries) task-description f args)) - (do - (log-debug (str "Successful " task-description ".")) - (:value res))))) - -(defn setup-default-uncaught-exception-handler - "Set a default uncaught exception handler to handle exceptions not caught in other threads." - [] - (Thread/setDefaultUncaughtExceptionHandler - (proxy [Thread$UncaughtExceptionHandler] [] - (uncaughtException [thread thrown] - (try - (Utils/handleUncaughtException thrown) - (catch Error err - (do - (log-error err "Received error in main thread.. terminating server...") - (.exit (Runtime/getRuntime) -2)))))))) - -(defn redact-value - "Hides value for k in coll for printing coll safely" - [coll k] - (if (contains? coll k) - (assoc coll k (apply str (repeat (count (coll k)) "#"))) - coll)) - -(defn- log-thrift-access-base - [request-id remoteAddress principal operation] - (str "Request ID: " request-id - " access from: " remoteAddress - " principal: " principal - " operation: " operation)) - -(defn log-thrift-access - [request-id remoteAddress principal operation storm-name access-result] - (doto - (ThriftAccessLogger.) - (.log (str (log-thrift-access-base request-id remoteAddress principal operation) - (if storm-name - (str " storm-name: " storm-name) "") - (if access-result - (str " access result: " access-result) ""))))) - -(defn log-thrift-access-function - [request-id remoteAddress principal operation function] - (doto - (ThriftAccessLogger.) - (.log (str (log-thrift-access-base request-id remoteAddress principal operation) - (if function - (str " function: " function) ""))))) - -(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"}) - -(defn validate-key-name! - [name] - (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS) - (throw (RuntimeException. - (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS)))) - (if (clojure.string/blank? name) - (throw (RuntimeException. - ("Key name cannot be blank"))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj deleted file mode 100644 index 2b5da55..0000000 --- a/storm-core/src/clj/backtype/storm/zookeeper.clj +++ /dev/null @@ -1,327 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.zookeeper - (:require [clojure.set :as set]) - (:import [org.apache.curator.retry RetryNTimes] - [org.apache.storm Config]) - (:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener]) - (:import [org.apache.curator.framework.state ConnectionStateListener]) - (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory]) - (:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener]) - (:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException - ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState - Watcher$Event$EventType KeeperException$NodeExistsException]) - (:import [org.apache.zookeeper.data Stat]) - (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory]) - (:import [java.net InetSocketAddress BindException InetAddress]) - (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) - (:import [java.io File]) - (:import [java.util List Map]) - (:import [org.apache.storm.utils Utils ZookeeperAuthInfo] - (org.apache.storm.blobstore KeyFilter BlobStore)) - (:use [org.apache.storm util log config])) - -(def zk-keeper-states - {Watcher$Event$KeeperState/Disconnected :disconnected - Watcher$Event$KeeperState/SyncConnected :connected - Watcher$Event$KeeperState/AuthFailed :auth-failed - Watcher$Event$KeeperState/Expired :expired}) - -(def zk-event-types - {Watcher$Event$EventType/None :none - Watcher$Event$EventType/NodeCreated :node-created - Watcher$Event$EventType/NodeDeleted :node-deleted - Watcher$Event$EventType/NodeDataChanged :node-data-changed - Watcher$Event$EventType/NodeChildrenChanged :node-children-changed}) - -(defn- default-watcher - [state type path] - (log-message "Zookeeper state update: " state type path)) - -(defnk mk-client - [conf servers port - :root "" - :watcher default-watcher - :auth-conf nil] - (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))] - (.. fk - (getCuratorListenable) - (addListener - (reify CuratorListener - (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e] - (when (= (.getType e) CuratorEventType/WATCHED) - (let [^WatchedEvent event (.getWatchedEvent e)] - (watcher (zk-keeper-states (.getState event)) - (zk-event-types (.getType event)) - (.getPath event)))))))) - ;; (.. fk - ;; (getUnhandledErrorListenable) - ;; (addListener - ;; (reify UnhandledErrorListener - ;; (unhandledError [this msg error] - ;; (if (or (exception-cause? InterruptedException error) - ;; (exception-cause? java.nio.channels.ClosedByInterruptException error)) - ;; (do (log-warn-error error "Zookeeper exception " msg) - ;; (let [to-throw (InterruptedException.)] - ;; (.initCause to-throw error) - ;; (throw to-throw) - ;; )) - ;; (do (log-error error "Unrecoverable Zookeeper error " msg) - ;; (halt-process! 1 "Unrecoverable Zookeeper error"))) - ;; )))) - (.start fk) - fk)) - -(def zk-create-modes - {:ephemeral CreateMode/EPHEMERAL - :persistent CreateMode/PERSISTENT - :sequential CreateMode/PERSISTENT_SEQUENTIAL}) - -(defn create-node - ([^CuratorFramework zk ^String path ^bytes data mode acls] - (let [mode (zk-create-modes mode)] - (try - (.. zk (create) (creatingParentsIfNeeded) (withMode mode) (withACL acls) (forPath (normalize-path path) data)) - (catch Exception e (throw (wrap-in-runtime e)))))) - ([^CuratorFramework zk ^String path ^bytes data acls] - (create-node zk path data :persistent acls))) - -(defn exists-node? - [^CuratorFramework zk ^String path watch?] - ((complement nil?) - (try - (if watch? - (.. zk (checkExists) (watched) (forPath (normalize-path path))) - (.. zk (checkExists) (forPath (normalize-path path)))) - (catch Exception e (throw (wrap-in-runtime e)))))) - -(defnk delete-node - [^CuratorFramework zk ^String path] - (let [path (normalize-path path)] - (when (exists-node? zk path false) - (try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path))) - (catch KeeperException$NoNodeException e - ;; do nothing - (log-message "exception" e) - ) - (catch Exception e (throw (wrap-in-runtime e))))))) - -(defn mkdirs - [^CuratorFramework zk ^String path acls] - (let [path (normalize-path path)] - (when-not (or (= path "/") (exists-node? zk path false)) - (mkdirs zk (parent-path path) acls) - (try-cause - (create-node zk path (barr 7) :persistent acls) - (catch KeeperException$NodeExistsException e - ;; this can happen when multiple clients doing mkdir at same time - )) - ))) - -(defn sync-path - [^CuratorFramework zk ^String path] - (try - (.. zk (sync) (forPath (normalize-path path))) - (catch Exception e (throw (wrap-in-runtime e))))) - - -(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener] - (.. zk (getConnectionStateListenable) (addListener listener))) - -(defn get-data - [^CuratorFramework zk ^String path watch?] - (let [path (normalize-path path)] - (try-cause - (if (exists-node? zk path watch?) - (if watch? - (.. zk (getData) (watched) (forPath path)) - (.. zk (getData) (forPath path)))) - (catch KeeperException$NoNodeException e - ;; this is fine b/c we still have a watch from the successful exists call - nil ) - (catch Exception e (throw (wrap-in-runtime e)))))) - -(defn get-data-with-version - [^CuratorFramework zk ^String path watch?] - (let [stats (org.apache.zookeeper.data.Stat. ) - path (normalize-path path)] - (try-cause - (if-let [data - (if (exists-node? zk path watch?) - (if watch? - (.. zk (getData) (watched) (storingStatIn stats) (forPath path)) - (.. zk (getData) (storingStatIn stats) (forPath path))))] - {:data data - :version (.getVersion stats)}) - (catch KeeperException$NoNodeException e - ;; this is fine b/c we still have a watch from the successful exists call - nil )))) - -(defn get-version -[^CuratorFramework zk ^String path watch?] - (if-let [stats - (if watch? - (.. zk (checkExists) (watched) (forPath (normalize-path path))) - (.. zk (checkExists) (forPath (normalize-path path))))] - (.getVersion stats) - nil)) - -(defn get-children - [^CuratorFramework zk ^String path watch?] - (try - (if watch? - (.. zk (getChildren) (watched) (forPath (normalize-path path))) - (.. zk (getChildren) (forPath (normalize-path path)))) - (catch Exception e (throw (wrap-in-runtime e))))) - -(defn delete-node-blobstore - "Deletes the state inside the zookeeper for a key, for which the - contents of the key starts with nimbus host port information" - [^CuratorFramework zk ^String parent-path ^String host-port-info] - (let [parent-path (normalize-path parent-path) - child-path-list (if (exists-node? zk parent-path false) - (into [] (get-children zk parent-path false)) - [])] - (doseq [child child-path-list] - (when (.startsWith child host-port-info) - (log-debug "delete-node " "child" child) - (delete-node zk (str parent-path "/" child)))))) - -(defn set-data - [^CuratorFramework zk ^String path ^bytes data] - (try - (.. zk (setData) (forPath (normalize-path path) data)) - (catch Exception e (throw (wrap-in-runtime e))))) - -(defn exists - [^CuratorFramework zk ^String path watch?] - (exists-node? zk path watch?)) - -(defnk mk-inprocess-zookeeper - [localdir :port nil] - (let [localfile (File. localdir) - zk (ZooKeeperServer. localfile localfile 2000) - [retport factory] - (loop [retport (if port port 2000)] - (if-let [factory-tmp - (try-cause - (doto (NIOServerCnxnFactory.) - (.configure (InetSocketAddress. retport) 0)) - (catch BindException e - (when (> (inc retport) (if port port 65535)) - (throw (RuntimeException. - "No port is available to launch an inprocess zookeeper.")))))] - [retport factory-tmp] - (recur (inc retport))))] - (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir) - (.startup factory zk) - [retport factory])) - -(defn shutdown-inprocess-zookeeper - [handle] - (.shutdown handle)) - -(defn- to-NimbusInfo [^Participant participant] - (let - [id (if (clojure.string/blank? (.getId participant)) - (throw (RuntimeException. "No nimbus leader participant host found, have you started your nimbus hosts?")) - (.getId participant)) - nimbus-info (NimbusInfo/parse id)] - (.setLeader nimbus-info (.isLeader participant)) - nimbus-info)) - -(defn- code-ids [blob-store] - (let [to-id (reify KeyFilter - (filter [this key] (get-id-from-blob-key key)))] - (set (.filterAndListKeys blob-store to-id)))) - -(defn leader-latch-listener-impl - "Leader latch listener that will be invoked when we either gain or lose leadership" - [conf zk blob-store leader-latch] - (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost)) - STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")] - (reify LeaderLatchListener - (^void isLeader[this] - (log-message (str hostname " gained leadership, checking if it has all the topology code locally.")) - (let [active-topology-ids (set (get-children zk STORMS-ROOT false)) - local-topology-ids (set (code-ids blob-store)) - diff-topology (set/difference active-topology-ids local-topology-ids)] - (log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids) - "] local-topology-ids [" (clojure.string/join "," local-topology-ids) - "] diff-topology [" (clojure.string/join "," diff-topology) "]") - (if (empty? diff-topology) - (log-message "Accepting leadership, all active topology found localy.") - (do - (log-message "code for all active topologies not available locally, giving up leadership.") - (.close leader-latch))))) - (^void notLeader[this] - (log-message (str hostname " lost leadership.")))))) - -(defn zk-leader-elector - "Zookeeper Implementation of ILeaderElector." - [conf blob-store] - (let [servers (conf STORM-ZOOKEEPER-SERVERS) - zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) - leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") - id (.toHostPortString (NimbusInfo/fromConf conf)) - leader-latch (atom (LeaderLatch. zk leader-lock-path id)) - leader-latch-listener (atom (leader-latch-listener-impl conf zk blob-store @leader-latch)) - ] - (reify ILeaderElector - (prepare [this conf] - (log-message "no-op for zookeeper implementation")) - - (^void addToLeaderLockQueue [this] - ;if this latch is already closed, we need to create new instance. - (if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch)) - (do - (reset! leader-latch (LeaderLatch. zk leader-lock-path id)) - (reset! leader-latch-listener (leader-latch-listener-impl conf zk blob-store @leader-latch)) - (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.") - )) - - ;Only if the latch is not already started we invoke start. - (if (.equals LeaderLatch$State/LATENT (.getState @leader-latch)) - (do - (.addListener @leader-latch @leader-latch-listener) - (.start @leader-latch) - (log-message "Queued up for leader lock.")) - (log-message "Node already in queue for leader lock."))) - - (^void removeFromLeaderLockQueue [this] - ;Only started latches can be closed. - (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch)) - (do - (.close @leader-latch) - (log-message "Removed from leader lock queue.")) - (log-message "leader latch is not started so no removeFromLeaderLockQueue needed."))) - - (^boolean isLeader [this] - (.hasLeadership @leader-latch)) - - (^NimbusInfo getLeader [this] - (to-NimbusInfo (.getLeader @leader-latch))) - - (^List getAllNimbuses [this] - (let [participants (.getParticipants @leader-latch)] - (map (fn [^Participant participant] - (to-NimbusInfo participant)) - participants))) - - (^void close[this] - (log-message "closing zookeeper connection of leader elector.") - (.close zk))))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj index 1f9b306..eda477c 100644 --- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj +++ b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj @@ -19,6 +19,7 @@ [org.apache.storm.cluster-state [zookeeper-state-factory :as zk-factory]] [org.apache.storm [config :refer :all] + [converter :as converter] [cluster :refer :all] [log :refer :all] [util :as util]]) @@ -27,7 +28,7 @@ HBMessageData HBPulse ClusterWorkerHeartbeat] [org.apache.storm.cluster_state zookeeper_state_factory] [org.apache.storm.cluster ClusterState] - [backtype.storm.utils Utils] + [org.apache.storm.utils Utils] [org.apache.storm.pacemaker PacemakerClient]) (:gen-class :implements [org.apache.storm.cluster.ClusterStateFactory])) @@ -39,7 +40,7 @@ (defn clojurify-details [details] (if details - (clojurify-zk-worker-hb (maybe-deserialize details ClusterWorkerHeartbeat)))) + (converter/clojurify-zk-worker-hb (maybe-deserialize details ClusterWorkerHeartbeat)))) (defn get-wk-hb-time-secs-pair [details-set] (for [details details-set]