http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/util.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj new file mode 100644 index 0000000..23d39f6 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/util.clj @@ -0,0 +1,1118 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.util + (:import [java.net InetAddress]) + (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap]) + (:import [java.io FileReader FileNotFoundException]) + (:import [java.nio.file Paths]) + (:import [org.apache.storm Config]) + (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils + MutableObject MutableInt]) + (:import [org.apache.storm.security.auth NimbusPrincipal]) + (:import [javax.security.auth Subject]) + (:import [java.util UUID Random ArrayList List Collections]) + (:import [java.util.zip ZipFile]) + (:import [java.util.concurrent.locks ReentrantReadWriteLock]) + (:import [java.util.concurrent Semaphore]) + (:import [java.nio.file Files Paths]) + (:import [java.nio.file.attribute FileAttribute]) + (:import [java.io File FileOutputStream RandomAccessFile StringWriter + PrintWriter BufferedReader InputStreamReader IOException]) + (:import [java.lang.management ManagementFactory]) + (:import [org.apache.commons.exec DefaultExecutor CommandLine]) + (:import [org.apache.commons.io FileUtils]) + (:import [org.apache.storm.logging ThriftAccessLogger]) + (:import [org.apache.commons.exec ExecuteException]) + (:import [org.json.simple JSONValue]) + (:import [org.yaml.snakeyaml Yaml] + [org.yaml.snakeyaml.constructor SafeConstructor]) + (:require [clojure [string :as str]]) + (:import [clojure.lang RT]) + (:require [clojure [set :as set]]) + (:require [clojure.java.io :as io]) + (:use [clojure walk]) + (:require [ring.util.codec :as codec]) + (:use [org.apache.storm log])) + +(defn wrap-in-runtime + "Wraps an exception in a RuntimeException if needed" + [^Exception e] + (if (instance? RuntimeException e) + e + (RuntimeException. e))) + +(def on-windows? + (= "Windows_NT" (System/getenv "OS"))) + +(def file-path-separator + (System/getProperty "file.separator")) + +(def class-path-separator + (System/getProperty "path.separator")) + +(defn is-absolute-path? [path] + (.isAbsolute (Paths/get path (into-array String [])))) + +(defmacro defalias + "Defines an alias for a var: a new var with the same root binding (if + any) and similar metadata. The metadata of the alias is its initial + metadata (as provided by def) merged into the metadata of the original." + ([name orig] + `(do + (alter-meta! + (if (.hasRoot (var ~orig)) + (def ~name (.getRawRoot (var ~orig))) + (def ~name)) + ;; When copying metadata, disregard {:macro false}. + ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273 + #(conj (dissoc % :macro) + (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %))))) + (var ~name))) + ([name orig doc] + (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig))) + +;; name-with-attributes by Konrad Hinsen: +(defn name-with-attributes + "To be used in macro definitions. + Handles optional docstrings and attribute maps for a name to be defined + in a list of macro arguments. If the first macro argument is a string, + it is added as a docstring to name and removed from the macro argument + list. If afterwards the first macro argument is a map, its entries are + added to the name's metadata map and the map is removed from the + macro argument list. The return value is a vector containing the name + with its extended metadata map and the list of unprocessed macro + arguments." + [name macro-args] + (let [[docstring macro-args] (if (string? (first macro-args)) + [(first macro-args) (next macro-args)] + [nil macro-args]) + [attr macro-args] (if (map? (first macro-args)) + [(first macro-args) (next macro-args)] + [{} macro-args]) + attr (if docstring + (assoc attr :doc docstring) + attr) + attr (if (meta name) + (conj (meta name) attr) + attr)] + [(with-meta name attr) macro-args])) + +(defmacro defnk + "Define a function accepting keyword arguments. Symbols up to the first + keyword in the parameter list are taken as positional arguments. Then + an alternating sequence of keywords and defaults values is expected. The + values of the keyword arguments are available in the function body by + virtue of the symbol corresponding to the keyword (cf. :keys destructuring). + defnk accepts an optional docstring as well as an optional metadata map." + [fn-name & fn-tail] + (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail) + [pos kw-vals] (split-with symbol? args) + syms (map #(-> % name symbol) (take-nth 2 kw-vals)) + values (take-nth 2 (rest kw-vals)) + sym-vals (apply hash-map (interleave syms values)) + de-map {:keys (vec syms) :or sym-vals}] + `(defn ~fn-name + [~@pos & options#] + (let [~de-map (apply hash-map options#)] + ~@body)))) + +(defn find-first + "Returns the first item of coll for which (pred item) returns logical true. + Consumes sequences up to the first match, will consume the entire sequence + and return nil if no match is found." + [pred coll] + (first (filter pred coll))) + +(defn dissoc-in + "Dissociates an entry from a nested associative structure returning a new + nested structure. keys is a sequence of keys. Any empty maps that result + will not be present in the new structure." + [m [k & ks :as keys]] + (if ks + (if-let [nextmap (get m k)] + (let [newmap (dissoc-in nextmap ks)] + (if (seq newmap) + (assoc m k newmap) + (dissoc m k))) + m) + (dissoc m k))) + +(defn indexed + "Returns a lazy sequence of [index, item] pairs, where items come + from 's' and indexes count up from zero. + + (indexed '(a b c d)) => ([0 a] [1 b] [2 c] [3 d])" + [s] + (map vector (iterate inc 0) s)) + +(defn positions + "Returns a lazy sequence containing the positions at which pred + is true for items in coll." + [pred coll] + (for [[idx elt] (indexed coll) :when (pred elt)] idx)) + +(defn exception-cause? + [klass ^Throwable t] + (->> (iterate #(.getCause ^Throwable %) t) + (take-while identity) + (some (partial instance? klass)) + boolean)) + +(defmacro thrown-cause? + [klass & body] + `(try + ~@body + false + (catch Throwable t# + (exception-cause? ~klass t#)))) + +(defmacro thrown-cause-with-msg? + [klass re & body] + `(try + ~@body + false + (catch Throwable t# + (and (re-matches ~re (.getMessage t#)) + (exception-cause? ~klass t#))))) + +(defmacro forcat + [[args aseq] & body] + `(mapcat (fn [~args] + ~@body) + ~aseq)) + +(defmacro try-cause + [& body] + (let [checker (fn [form] + (or (not (sequential? form)) + (not= 'catch (first form)))) + [code guards] (split-with checker body) + error-local (gensym "t") + guards (forcat [[_ klass local & guard-body] guards] + `((exception-cause? ~klass ~error-local) + (let [~local ~error-local] + ~@guard-body + )))] + `(try ~@code + (catch Throwable ~error-local + (cond ~@guards + true (throw ~error-local) + ))))) + +(defn local-hostname + [] + (.getCanonicalHostName (InetAddress/getLocalHost))) + +(def memoized-local-hostname (memoize local-hostname)) + +;; checks conf for STORM_LOCAL_HOSTNAME. +;; when unconfigured, falls back to (memoized) guess by `local-hostname`. +(defn hostname + [conf] + (conf Config/STORM_LOCAL_HOSTNAME (memoized-local-hostname))) + +(letfn [(try-port [port] + (with-open [socket (java.net.ServerSocket. port)] + (.getLocalPort socket)))] + (defn available-port + ([] (try-port 0)) + ([preferred] + (try + (try-port preferred) + (catch java.io.IOException e + (available-port)))))) + +(defn uuid [] + (str (UUID/randomUUID))) + +(defn current-time-secs + [] + (Time/currentTimeSecs)) + +(defn current-time-millis + [] + (Time/currentTimeMillis)) + +(defn secs-to-millis-long + [secs] + (long (* (long 1000) secs))) + +(defn clojurify-structure + [s] + (prewalk (fn [x] + (cond (instance? Map x) (into {} x) + (instance? List x) (vec x) + ;; (Boolean. false) does not evaluate to false in an if. + ;; This fixes that. + (instance? Boolean x) (boolean x) + true x)) + s)) + +(defmacro with-file-lock + [path & body] + `(let [f# (File. ~path) + _# (.createNewFile f#) + rf# (RandomAccessFile. f# "rw") + lock# (.. rf# (getChannel) (lock))] + (try + ~@body + (finally + (.release lock#) + (.close rf#))))) + +(defn tokenize-path + [^String path] + (let [toks (.split path "/")] + (vec (filter (complement empty?) toks)))) + +(defn assoc-conj + [m k v] + (merge-with concat m {k [v]})) + +;; returns [ones in first set not in second, ones in second set not in first] +(defn set-delta + [old curr] + (let [s1 (set old) + s2 (set curr)] + [(set/difference s1 s2) (set/difference s2 s1)])) + +(defn parent-path + [path] + (let [toks (tokenize-path path)] + (str "/" (str/join "/" (butlast toks))))) + +(defn toks->path + [toks] + (str "/" (str/join "/" toks))) + +(defn normalize-path + [^String path] + (toks->path (tokenize-path path))) + +(defn map-val + [afn amap] + (into {} + (for [[k v] amap] + [k (afn v)]))) + +(defn filter-val + [afn amap] + (into {} (filter (fn [[k v]] (afn v)) amap))) + +(defn filter-key + [afn amap] + (into {} (filter (fn [[k v]] (afn k)) amap))) + +(defn map-key + [afn amap] + (into {} (for [[k v] amap] [(afn k) v]))) + +(defn separate + [pred aseq] + [(filter pred aseq) (filter (complement pred) aseq)]) + +(defn full-path + [parent name] + (let [toks (tokenize-path parent)] + (toks->path (conj toks name)))) + +(def not-nil? (complement nil?)) + +(defn barr + [& vals] + (byte-array (map byte vals))) + +(defn exit-process! + [val & msg] + (log-error (RuntimeException. (str msg)) "Halting process: " msg) + (.exit (Runtime/getRuntime) val)) + +(defn sum + [vals] + (reduce + vals)) + +(defn repeat-seq + ([aseq] + (apply concat (repeat aseq))) + ([amt aseq] + (apply concat (repeat amt aseq)))) + +(defn div + "Perform floating point division on the arguments." + [f & rest] + (apply / (double f) rest)) + +(defn defaulted + [val default] + (if val val default)) + +(defn mk-counter + ([] (mk-counter 1)) + ([start-val] + (let [val (atom (dec start-val))] + (fn [] (swap! val inc))))) + +(defmacro for-times [times & body] + `(for [i# (range ~times)] + ~@body)) + +(defmacro dofor [& body] + `(doall (for ~@body))) + +(defn reverse-map + "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}" + [amap] + (reduce (fn [m [k v]] + (let [existing (get m v [])] + (assoc m v (conj existing k)))) + {} amap)) + +(defmacro print-vars [& vars] + (let [prints (for [v vars] `(println ~(str v) ~v))] + `(do ~@prints))) + +(defn process-pid + "Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this." + [] + (let [name (.getName (ManagementFactory/getRuntimeMXBean)) + split (.split name "@")] + (when-not (= 2 (count split)) + (throw (RuntimeException. (str "Got unexpected process name: " name)))) + (first split))) + +(defn exec-command! [command] + (let [[comm-str & args] (seq (.split command " ")) + command (CommandLine. comm-str)] + (doseq [a args] + (.addArgument command a)) + (.execute (DefaultExecutor.) command))) + +(defn extract-dir-from-jar [jarpath dir destdir] + (try-cause + (with-open [jarpath (ZipFile. jarpath)] + (let [entries (enumeration-seq (.entries jarpath))] + (doseq [file (filter (fn [entry](and (not (.isDirectory entry)) (.startsWith (.getName entry) dir))) entries)] + (.mkdirs (.getParentFile (File. destdir (.getName file)))) + (with-open [out (FileOutputStream. (File. destdir (.getName file)))] + (io/copy (.getInputStream jarpath file) out))))) + (catch IOException e + (log-message "Could not extract " dir " from " jarpath)))) + +(defn sleep-secs [secs] + (when (pos? secs) + (Time/sleep (* (long secs) 1000)))) + +(defn sleep-until-secs [target-secs] + (Time/sleepUntil (* (long target-secs) 1000))) + +(def ^:const sig-kill 9) + +(def ^:const sig-term 15) + +(defn send-signal-to-process + [pid signum] + (try-cause + (exec-command! (str (if on-windows? + (if (== signum sig-kill) "taskkill /f /pid " "taskkill /pid ") + (str "kill -" signum " ")) + pid)) + (catch ExecuteException e + (log-message "Error when trying to kill " pid ". Process is probably already dead.")))) + +(defn read-and-log-stream + [prefix stream] + (try + (let [reader (BufferedReader. (InputStreamReader. stream))] + (loop [] + (if-let [line (.readLine reader)] + (do + (log-warn (str prefix ":" line)) + (recur))))) + (catch IOException e + (log-warn "Error while trying to log stream" e)))) + +(defn force-kill-process + [pid] + (send-signal-to-process pid sig-kill)) + +(defn kill-process-with-sig-term + [pid] + (send-signal-to-process pid sig-term)) + +(defn add-shutdown-hook-with-force-kill-in-1-sec + "adds the user supplied function as a shutdown hook for cleanup. + Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case + cleanup function hangs." + [func] + (.addShutdownHook (Runtime/getRuntime) (Thread. #(func))) + (.addShutdownHook (Runtime/getRuntime) (Thread. #((sleep-secs 1) + (.halt (Runtime/getRuntime) 20))))) + +(defprotocol SmartThread + (start [this]) + (join [this]) + (interrupt [this]) + (sleeping? [this])) + +;; afn returns amount of time to sleep +(defnk async-loop [afn + :daemon false + :kill-fn (fn [error] (exit-process! 1 "Async loop died!")) + :priority Thread/NORM_PRIORITY + :factory? false + :start true + :thread-name nil] + (let [thread (Thread. + (fn [] + (try-cause + (let [afn (if factory? (afn) afn)] + (loop [] + (let [sleep-time (afn)] + (when-not (nil? sleep-time) + (sleep-secs sleep-time) + (recur)) + ))) + (catch InterruptedException e + (log-message "Async loop interrupted!") + ) + (catch Throwable t + (log-error t "Async loop died!") + (kill-fn t)))))] + (.setDaemon thread daemon) + (.setPriority thread priority) + (when thread-name + (.setName thread (str (.getName thread) "-" thread-name))) + (when start + (.start thread)) + ;; should return object that supports stop, interrupt, join, and waiting? + (reify SmartThread + (start + [this] + (.start thread)) + (join + [this] + (.join thread)) + (interrupt + [this] + (.interrupt thread)) + (sleeping? + [this] + (Time/isThreadWaiting thread))))) + +(defn shell-cmd + [command] + (->> command + (map #(str \' (clojure.string/escape % {\' "'\"'\"'"}) \')) + (clojure.string/join " "))) + +(defn script-file-path [dir] + (str dir file-path-separator "storm-worker-script.sh")) + +(defn container-file-path [dir] + (str dir file-path-separator "launch_container.sh")) + +(defnk write-script + [dir command :environment {}] + (let [script-src (str "#!/bin/bash\n" (clojure.string/join "" (map (fn [[k v]] (str (shell-cmd ["export" (str k "=" v)]) ";\n")) environment)) "\nexec " (shell-cmd command) ";") + script-path (script-file-path dir) + _ (spit script-path script-src)] + script-path + )) + +(defnk launch-process + [command :environment {} :log-prefix nil :exit-code-callback nil :directory nil] + (let [builder (ProcessBuilder. command) + process-env (.environment builder)] + (when directory (.directory builder directory)) + (.redirectErrorStream builder true) + (doseq [[k v] environment] + (.put process-env k v)) + (let [process (.start builder)] + (if (or log-prefix exit-code-callback) + (async-loop + (fn [] + (if log-prefix + (read-and-log-stream log-prefix (.getInputStream process))) + (when exit-code-callback + (try + (.waitFor process) + (catch InterruptedException e + (log-message log-prefix " interrupted."))) + (exit-code-callback (.exitValue process))) + nil))) + process))) + +(defn exists-file? + [path] + (.exists (File. path))) + +(defn rmr + [path] + (log-debug "Rmr path " path) + (when (exists-file? path) + (try + (FileUtils/forceDelete (File. path)) + (catch FileNotFoundException e)))) + +(defn rmpath + "Removes file or directory at the path. Not recursive. Throws exception on failure" + [path] + (log-debug "Removing path " path) + (when (exists-file? path) + (let [deleted? (.delete (File. path))] + (when-not deleted? + (throw (RuntimeException. (str "Failed to delete " path))))))) + +(defn local-mkdirs + [path] + (log-debug "Making dirs at " path) + (FileUtils/forceMkdir (File. path))) + +(defn touch + [path] + (log-debug "Touching file at " path) + (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path)))) + (.createNewFile (File. path)))] + (when-not success? + (throw (RuntimeException. (str "Failed to touch " path)))))) + +(defn create-symlink! + "Create symlink is to the target" + ([path-dir target-dir file-name] + (create-symlink! path-dir target-dir file-name file-name)) + ([path-dir target-dir from-file-name to-file-name] + (let [path (str path-dir file-path-separator from-file-name) + target (str target-dir file-path-separator to-file-name) + empty-array (make-array String 0) + attrs (make-array FileAttribute 0) + abs-path (.toAbsolutePath (Paths/get path empty-array)) + abs-target (.toAbsolutePath (Paths/get target empty-array))] + (log-debug "Creating symlink [" abs-path "] to [" abs-target "]") + (if (not (.exists (.toFile abs-path))) + (Files/createSymbolicLink abs-path abs-target attrs))))) + +(defn read-dir-contents + [dir] + (if (exists-file? dir) + (let [content-files (.listFiles (File. dir))] + (map #(.getName ^File %) content-files)) + [])) + +(defn compact + [aseq] + (filter (complement nil?) aseq)) + +(defn current-classpath + [] + (System/getProperty "java.class.path")) + +(defn get-full-jars + [dir] + (map #(str dir file-path-separator %) (filter #(.endsWith % ".jar") (read-dir-contents dir)))) + +(defn worker-classpath + [] + (let [storm-dir (System/getProperty "storm.home") + storm-lib-dir (str storm-dir file-path-separator "lib") + storm-conf-dir (if-let [confdir (System/getenv "STORM_CONF_DIR")] + confdir + (str storm-dir file-path-separator "conf")) + storm-extlib-dir (str storm-dir file-path-separator "extlib") + extcp (System/getenv "STORM_EXT_CLASSPATH")] + (if (nil? storm-dir) + (current-classpath) + (str/join class-path-separator + (remove nil? (concat (get-full-jars storm-lib-dir) (get-full-jars storm-extlib-dir) [extcp] [storm-conf-dir])))))) + +(defn add-to-classpath + [classpath paths] + (if (empty? paths) + classpath + (str/join class-path-separator (cons classpath paths)))) + +(defn ^ReentrantReadWriteLock mk-rw-lock + [] + (ReentrantReadWriteLock.)) + +(defmacro read-locked + [rw-lock & body] + (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})] + `(let [rlock# (.readLock ~lock)] + (try (.lock rlock#) + ~@body + (finally (.unlock rlock#)))))) + +(defmacro write-locked + [rw-lock & body] + (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})] + `(let [wlock# (.writeLock ~lock)] + (try (.lock wlock#) + ~@body + (finally (.unlock wlock#)))))) + +(defn time-delta + [time-secs] + (- (current-time-secs) time-secs)) + +(defn time-delta-ms + [time-ms] + (- (System/currentTimeMillis) (long time-ms))) + +(defn parse-int + [str] + (Integer/valueOf str)) + +(defn integer-divided + [sum num-pieces] + (clojurify-structure (Utils/integerDivided sum num-pieces))) + +(defn collectify + [obj] + (if (or (sequential? obj) (instance? Collection obj)) + obj + [obj])) + +(defn to-json + [obj] + (JSONValue/toJSONString obj)) + +(defn from-json + [^String str] + (if str + (clojurify-structure + (JSONValue/parse str)) + nil)) + +(defmacro letlocals + [& body] + (let [[tobind lexpr] (split-at (dec (count body)) body) + binded (vec (mapcat (fn [e] + (if (and (list? e) (= 'bind (first e))) + [(second e) (last e)] + ['_ e] + )) + tobind))] + `(let ~binded + ~(first lexpr)))) + +(defn remove-first + [pred aseq] + (let [[b e] (split-with (complement pred) aseq)] + (when (empty? e) + (throw (IllegalArgumentException. "Nothing to remove"))) + (concat b (rest e)))) + +(defn assoc-non-nil + [m k v] + (if v (assoc m k v) m)) + +(defn multi-set + "Returns a map of elem to count" + [aseq] + (apply merge-with + + (map #(hash-map % 1) aseq))) + +(defn set-var-root* + [avar val] + (alter-var-root avar (fn [avar] val))) + +(defmacro set-var-root + [var-sym val] + `(set-var-root* (var ~var-sym) ~val)) + +(defmacro with-var-roots + [bindings & body] + (let [settings (partition 2 bindings) + tmpvars (repeatedly (count settings) (partial gensym "old")) + vars (map first settings) + savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars)) + setters (for [[v s] settings] `(set-var-root ~v ~s)) + restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)] + `(let ~savevals + ~@setters + (try + ~@body + (finally + ~@restorers))))) + +(defn map-diff + "Returns mappings in m2 that aren't in m1" + [m1 m2] + (into {} (filter (fn [[k v]] (not= v (m1 k))) m2))) + +(defn select-keys-pred + [pred amap] + (into {} (filter (fn [[k v]] (pred k)) amap))) + +(defn rotating-random-range + [choices] + (let [rand (Random.) + choices (ArrayList. choices)] + (Collections/shuffle choices rand) + [(MutableInt. -1) choices rand])) + +(defn acquire-random-range-id + [[^MutableInt curr ^List state ^Random rand]] + (when (>= (.increment curr) (.size state)) + (.set curr 0) + (Collections/shuffle state rand)) + (.get state (.get curr))) + +; this can be rewritten to be tail recursive +(defn interleave-all + [& colls] + (if (empty? colls) + [] + (let [colls (filter (complement empty?) colls) + my-elems (map first colls) + rest-elems (apply interleave-all (map rest colls))] + (concat my-elems rest-elems)))) + +(defn any-intersection + [& sets] + (let [elem->count (multi-set (apply concat sets))] + (-> (filter-val #(> % 1) elem->count) + keys))) + +(defn between? + "val >= lower and val <= upper" + [val lower upper] + (and (>= val lower) + (<= val upper))) + +(defmacro benchmark + [& body] + `(let [l# (doall (range 1000000))] + (time + (doseq [i# l#] + ~@body)))) + +(defn rand-sampler + [freq] + (let [r (java.util.Random.)] + (fn [] (= 0 (.nextInt r freq))))) + +(defn even-sampler + [freq] + (let [freq (int freq) + start (int 0) + r (java.util.Random.) + curr (MutableInt. -1) + target (MutableInt. (.nextInt r freq))] + (with-meta + (fn [] + (let [i (.increment curr)] + (when (>= i freq) + (.set curr start) + (.set target (.nextInt r freq)))) + (= (.get curr) (.get target))) + {:rate freq}))) + +(defn sampler-rate + [sampler] + (:rate (meta sampler))) + +(defn class-selector + [obj & args] + (class obj)) + +(defn uptime-computer [] + (let [start-time (current-time-secs)] + (fn [] (time-delta start-time)))) + +(defn stringify-error [error] + (let [result (StringWriter.) + printer (PrintWriter. result)] + (.printStackTrace error printer) + (.toString result))) + +(defn nil-to-zero + [v] + (or v 0)) + +(defn bit-xor-vals + [vals] + (reduce bit-xor 0 vals)) + +(defmacro with-error-reaction + [afn & body] + `(try ~@body + (catch Throwable t# (~afn t#)))) + +(defn container + [] + (Container.)) + +(defn container-set! [^Container container obj] + (set! (. container object) obj) + container) + +(defn container-get [^Container container] + (. container object)) + +(defn to-millis [secs] + (* 1000 (long secs))) + +(defn throw-runtime [& strs] + (throw (RuntimeException. (apply str strs)))) + +(defn redirect-stdio-to-slf4j! + [] + ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here + ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes + ;; it might have something to do with being a child process + ;; (set! (. (.getThreadBinding RT/OUT) val) + ;; (java.io.OutputStreamWriter. + ;; (log-stream :info "STDIO"))) + ;; (set! (. (.getThreadBinding RT/ERR) val) + ;; (PrintWriter. + ;; (java.io.OutputStreamWriter. + ;; (log-stream :error "STDIO")) + ;; true)) + (log-capture! "STDIO")) + +(defn spy + [prefix val] + (log-message prefix ": " val) + val) + +(defn zip-contains-dir? + [zipfile target] + (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))] + (boolean (some #(.startsWith % (str target "/")) entries)))) + +(defn url-encode + [s] + (codec/url-encode s)) + +(defn url-decode + [s] + (codec/url-decode s)) + +(defn join-maps + [& maps] + (let [all-keys (apply set/union (for [m maps] (-> m keys set)))] + (into {} (for [k all-keys] + [k (for [m maps] (m k))])))) + +(defn partition-fixed + [max-num-chunks aseq] + (if (zero? max-num-chunks) + [] + (let [chunks (->> (integer-divided (count aseq) max-num-chunks) + (#(dissoc % 0)) + (sort-by (comp - first)) + (mapcat (fn [[size amt]] (repeat amt size))) + )] + (loop [result [] + [chunk & rest-chunks] chunks + data aseq] + (if (nil? chunk) + result + (let [[c rest-data] (split-at chunk data)] + (recur (conj result c) + rest-chunks + rest-data))))))) + + +(defn assoc-apply-self + [curr key afn] + (assoc curr key (afn curr))) + +(defmacro recursive-map + [& forms] + (->> (partition 2 forms) + (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form)))) + (concat `(-> {})))) + +(defn current-stack-trace + [] + (->> (Thread/currentThread) + .getStackTrace + (map str) + (str/join "\n"))) + +(defn get-iterator + [^Iterable alist] + (if alist (.iterator alist))) + +(defn iter-has-next? + [^Iterator iter] + (if iter (.hasNext iter) false)) + +(defn iter-next + [^Iterator iter] + (.next iter)) + +(defmacro fast-list-iter + [pairs & body] + (let [pairs (partition 2 pairs) + lists (map second pairs) + elems (map first pairs) + iters (map (fn [_] (gensym)) lists) + bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists) + (apply concat)) + tests (map (fn [i] `(iter-has-next? ~i)) iters) + assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters) + (apply concat))] + `(let [~@bindings] + (while (and ~@tests) + (let [~@assignments] + ~@body))))) + +(defn fast-list-map + [afn alist] + (let [ret (ArrayList.)] + (fast-list-iter [e alist] + (.add ret (afn e))) + ret)) + +(defmacro fast-list-for + [[e alist] & body] + `(fast-list-map (fn [~e] ~@body) ~alist)) + +(defn map-iter + [^Map amap] + (if amap (-> amap .entrySet .iterator))) + +(defn convert-entry + [^Map$Entry entry] + [(.getKey entry) (.getValue entry)]) + +(defmacro fast-map-iter + [[bind amap] & body] + `(let [iter# (map-iter ~amap)] + (while (iter-has-next? iter#) + (let [entry# (iter-next iter#) + ~bind (convert-entry entry#)] + ~@body)))) + +(defn fast-first + [^List alist] + (.get alist 0)) + +(defmacro get-with-default + [amap key default-val] + `(let [curr# (.get ~amap ~key)] + (if curr# + curr# + (do + (let [new# ~default-val] + (.put ~amap ~key new#) + new#))))) + +(defn fast-group-by + [afn alist] + (let [ret (HashMap.)] + (fast-list-iter + [e alist] + (let [key (afn e) + ^List curr (get-with-default ret key (ArrayList.))] + (.add curr e))) + ret)) + +(defn new-instance + [klass] + (let [klass (if (string? klass) (Class/forName klass) klass)] + (.newInstance klass))) + +(defn get-configured-class + [conf config-key] + (if (.get conf config-key) (new-instance (.get conf config-key)) nil)) + +(defmacro -<> + ([x] x) + ([x form] (if (seq? form) + (with-meta + (let [[begin [_ & end]] (split-with #(not= % '<>) form)] + (concat begin [x] end)) + (meta form)) + (list form x))) + ([x form & more] `(-<> (-<> ~x ~form) ~@more))) + +(defn logs-filename + [storm-id port] + (str storm-id file-path-separator port file-path-separator "worker.log")) + +(def worker-log-filename-pattern #"^worker.log(.*)") + +(defn event-logs-filename + [storm-id port] + (str storm-id file-path-separator port file-path-separator "events.log")) + +(defn clojure-from-yaml-file [yamlFile] + (try + (with-open [reader (java.io.FileReader. yamlFile)] + (clojurify-structure (.load (Yaml. (SafeConstructor.)) reader))) + (catch Exception ex + (log-error ex)))) + +(defn hashmap-to-persistent [^HashMap m] + (zipmap (.keySet m) (.values m))) + +(defn retry-on-exception + "Retries specific function on exception based on retries count" + [retries task-description f & args] + (let [res (try {:value (apply f args)} + (catch Exception e + (if (<= 0 retries) + (throw e) + {:exception e})))] + (if (:exception res) + (do + (log-error (:exception res) (str "Failed to " task-description ". Will make [" retries "] more attempts.")) + (recur (dec retries) task-description f args)) + (do + (log-debug (str "Successful " task-description ".")) + (:value res))))) + +(defn setup-default-uncaught-exception-handler + "Set a default uncaught exception handler to handle exceptions not caught in other threads." + [] + (Thread/setDefaultUncaughtExceptionHandler + (proxy [Thread$UncaughtExceptionHandler] [] + (uncaughtException [thread thrown] + (try + (Utils/handleUncaughtException thrown) + (catch Error err + (do + (log-error err "Received error in main thread.. terminating server...") + (.exit (Runtime/getRuntime) -2)))))))) + +(defn redact-value + "Hides value for k in coll for printing coll safely" + [coll k] + (if (contains? coll k) + (assoc coll k (apply str (repeat (count (coll k)) "#"))) + coll)) + +(defn log-thrift-access + [request-id remoteAddress principal operation] + (doto + (ThriftAccessLogger.) + (.log (str "Request ID: " request-id " access from: " remoteAddress " principal: " principal " operation: " operation)))) + +(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"}) + +(defn validate-key-name! + [name] + (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS) + (throw (RuntimeException. + (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS)))) + (if (clojure.string/blank? name) + (throw (RuntimeException. + ("Key name cannot be blank"))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj new file mode 100644 index 0000000..8a223cd --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj @@ -0,0 +1,308 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.zookeeper + (:import [org.apache.curator.retry RetryNTimes] + [org.apache.storm Config]) + (:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener]) + (:import [org.apache.curator.framework.state ConnectionStateListener]) + (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory]) + (:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener]) + (:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException + ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState + Watcher$Event$EventType KeeperException$NodeExistsException]) + (:import [org.apache.zookeeper.data Stat]) + (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory]) + (:import [java.net InetSocketAddress BindException InetAddress]) + (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) + (:import [java.io File]) + (:import [java.util List Map]) + (:import [org.apache.storm.utils Utils ZookeeperAuthInfo]) + (:use [org.apache.storm util log config])) + +(def zk-keeper-states + {Watcher$Event$KeeperState/Disconnected :disconnected + Watcher$Event$KeeperState/SyncConnected :connected + Watcher$Event$KeeperState/AuthFailed :auth-failed + Watcher$Event$KeeperState/Expired :expired}) + +(def zk-event-types + {Watcher$Event$EventType/None :none + Watcher$Event$EventType/NodeCreated :node-created + Watcher$Event$EventType/NodeDeleted :node-deleted + Watcher$Event$EventType/NodeDataChanged :node-data-changed + Watcher$Event$EventType/NodeChildrenChanged :node-children-changed}) + +(defn- default-watcher + [state type path] + (log-message "Zookeeper state update: " state type path)) + +(defnk mk-client + [conf servers port + :root "" + :watcher default-watcher + :auth-conf nil] + (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))] + (.. fk + (getCuratorListenable) + (addListener + (reify CuratorListener + (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e] + (when (= (.getType e) CuratorEventType/WATCHED) + (let [^WatchedEvent event (.getWatchedEvent e)] + (watcher (zk-keeper-states (.getState event)) + (zk-event-types (.getType event)) + (.getPath event)))))))) + ;; (.. fk + ;; (getUnhandledErrorListenable) + ;; (addListener + ;; (reify UnhandledErrorListener + ;; (unhandledError [this msg error] + ;; (if (or (exception-cause? InterruptedException error) + ;; (exception-cause? java.nio.channels.ClosedByInterruptException error)) + ;; (do (log-warn-error error "Zookeeper exception " msg) + ;; (let [to-throw (InterruptedException.)] + ;; (.initCause to-throw error) + ;; (throw to-throw) + ;; )) + ;; (do (log-error error "Unrecoverable Zookeeper error " msg) + ;; (halt-process! 1 "Unrecoverable Zookeeper error"))) + ;; )))) + (.start fk) + fk)) + +(def zk-create-modes + {:ephemeral CreateMode/EPHEMERAL + :persistent CreateMode/PERSISTENT + :sequential CreateMode/PERSISTENT_SEQUENTIAL}) + +(defn create-node + ([^CuratorFramework zk ^String path ^bytes data mode acls] + (let [mode (zk-create-modes mode)] + (try + (.. zk (create) (creatingParentsIfNeeded) (withMode mode) (withACL acls) (forPath (normalize-path path) data)) + (catch Exception e (throw (wrap-in-runtime e)))))) + ([^CuratorFramework zk ^String path ^bytes data acls] + (create-node zk path data :persistent acls))) + +(defn exists-node? + [^CuratorFramework zk ^String path watch?] + ((complement nil?) + (try + (if watch? + (.. zk (checkExists) (watched) (forPath (normalize-path path))) + (.. zk (checkExists) (forPath (normalize-path path)))) + (catch Exception e (throw (wrap-in-runtime e)))))) + +(defnk delete-node + [^CuratorFramework zk ^String path] + (let [path (normalize-path path)] + (when (exists-node? zk path false) + (try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path))) + (catch KeeperException$NoNodeException e + ;; do nothing + (log-message "exception" e) + ) + (catch Exception e (throw (wrap-in-runtime e))))))) + +(defn mkdirs + [^CuratorFramework zk ^String path acls] + (let [path (normalize-path path)] + (when-not (or (= path "/") (exists-node? zk path false)) + (mkdirs zk (parent-path path) acls) + (try-cause + (create-node zk path (barr 7) :persistent acls) + (catch KeeperException$NodeExistsException e + ;; this can happen when multiple clients doing mkdir at same time + )) + ))) + +(defn sync-path + [^CuratorFramework zk ^String path] + (try + (.. zk (sync) (forPath (normalize-path path))) + (catch Exception e (throw (wrap-in-runtime e))))) + + +(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener] + (.. zk (getConnectionStateListenable) (addListener listener))) + +(defn get-data + [^CuratorFramework zk ^String path watch?] + (let [path (normalize-path path)] + (try-cause + (if (exists-node? zk path watch?) + (if watch? + (.. zk (getData) (watched) (forPath path)) + (.. zk (getData) (forPath path)))) + (catch KeeperException$NoNodeException e + ;; this is fine b/c we still have a watch from the successful exists call + nil ) + (catch Exception e (throw (wrap-in-runtime e)))))) + +(defn get-data-with-version + [^CuratorFramework zk ^String path watch?] + (let [stats (org.apache.zookeeper.data.Stat. ) + path (normalize-path path)] + (try-cause + (if-let [data + (if (exists-node? zk path watch?) + (if watch? + (.. zk (getData) (watched) (storingStatIn stats) (forPath path)) + (.. zk (getData) (storingStatIn stats) (forPath path))))] + {:data data + :version (.getVersion stats)}) + (catch KeeperException$NoNodeException e + ;; this is fine b/c we still have a watch from the successful exists call + nil )))) + +(defn get-version +[^CuratorFramework zk ^String path watch?] + (if-let [stats + (if watch? + (.. zk (checkExists) (watched) (forPath (normalize-path path))) + (.. zk (checkExists) (forPath (normalize-path path))))] + (.getVersion stats) + nil)) + +(defn get-children + [^CuratorFramework zk ^String path watch?] + (try + (if watch? + (.. zk (getChildren) (watched) (forPath (normalize-path path))) + (.. zk (getChildren) (forPath (normalize-path path)))) + (catch Exception e (throw (wrap-in-runtime e))))) + +(defn delete-node-blobstore + "Deletes the state inside the zookeeper for a key, for which the + contents of the key starts with nimbus host port information" + [^CuratorFramework zk ^String parent-path ^String host-port-info] + (let [parent-path (normalize-path parent-path) + child-path-list (if (exists-node? zk parent-path false) + (into [] (get-children zk parent-path false)) + [])] + (doseq [child child-path-list] + (when (.startsWith child host-port-info) + (log-debug "delete-node " "child" child) + (delete-node zk (str parent-path "/" child)))))) + +(defn set-data + [^CuratorFramework zk ^String path ^bytes data] + (try + (.. zk (setData) (forPath (normalize-path path) data)) + (catch Exception e (throw (wrap-in-runtime e))))) + +(defn exists + [^CuratorFramework zk ^String path watch?] + (exists-node? zk path watch?)) + +(defnk mk-inprocess-zookeeper + [localdir :port nil] + (let [localfile (File. localdir) + zk (ZooKeeperServer. localfile localfile 2000) + [retport factory] + (loop [retport (if port port 2000)] + (if-let [factory-tmp + (try-cause + (doto (NIOServerCnxnFactory.) + (.configure (InetSocketAddress. retport) 0)) + (catch BindException e + (when (> (inc retport) (if port port 65535)) + (throw (RuntimeException. + "No port is available to launch an inprocess zookeeper.")))))] + [retport factory-tmp] + (recur (inc retport))))] + (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir) + (.startup factory zk) + [retport factory])) + +(defn shutdown-inprocess-zookeeper + [handle] + (.shutdown handle)) + +(defn- to-NimbusInfo [^Participant participant] + (let + [id (if (clojure.string/blank? (.getId participant)) + (throw (RuntimeException. "No nimbus leader participant host found, have you started your nimbus hosts?")) + (.getId participant)) + nimbus-info (NimbusInfo/parse id)] + (.setLeader nimbus-info (.isLeader participant)) + nimbus-info)) + +(defn leader-latch-listener-impl + "Leader latch listener that will be invoked when we either gain or lose leadership" + [conf zk leader-latch] + (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))] + (reify LeaderLatchListener + (^void isLeader[this] + (log-message (str hostname " gained leadership"))) + (^void notLeader[this] + (log-message (str hostname " lost leadership.")))))) + +(defn zk-leader-elector + "Zookeeper Implementation of ILeaderElector." + [conf] + (let [servers (conf STORM-ZOOKEEPER-SERVERS) + zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) + leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") + id (.toHostPortString (NimbusInfo/fromConf conf)) + leader-latch (atom (LeaderLatch. zk leader-lock-path id)) + leader-latch-listener (atom (leader-latch-listener-impl conf zk @leader-latch)) + ] + (reify ILeaderElector + (prepare [this conf] + (log-message "no-op for zookeeper implementation")) + + (^void addToLeaderLockQueue [this] + ;if this latch is already closed, we need to create new instance. + (if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch)) + (do + (reset! leader-latch (LeaderLatch. zk leader-lock-path id)) + (reset! leader-latch-listener (leader-latch-listener-impl conf zk @leader-latch)) + (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.") + )) + + ;Only if the latch is not already started we invoke start. + (if (.equals LeaderLatch$State/LATENT (.getState @leader-latch)) + (do + (.addListener @leader-latch @leader-latch-listener) + (.start @leader-latch) + (log-message "Queued up for leader lock.")) + (log-message "Node already in queue for leader lock."))) + + (^void removeFromLeaderLockQueue [this] + ;Only started latches can be closed. + (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch)) + (do + (.close @leader-latch) + (log-message "Removed from leader lock queue.")) + (log-message "leader latch is not started so no removeFromLeaderLockQueue needed."))) + + (^boolean isLeader [this] + (.hasLeadership @leader-latch)) + + (^NimbusInfo getLeader [this] + (to-NimbusInfo (.getLeader @leader-latch))) + + (^List getAllNimbuses [this] + (let [participants (.getParticipants @leader-latch)] + (map (fn [^Participant participant] + (to-NimbusInfo participant)) + participants))) + + (^void close[this] + (log-message "closing zookeeper connection of leader elector.") + (.close zk))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/storm/trident/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/storm/trident/testing.clj b/storm-core/src/clj/storm/trident/testing.clj deleted file mode 100644 index ac5fcab..0000000 --- a/storm-core/src/clj/storm/trident/testing.clj +++ /dev/null @@ -1,79 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns storm.trident.testing - (:require [backtype.storm.LocalDRPC :as LocalDRPC]) - (:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs]) - (:require [backtype.storm [LocalDRPC]]) - (:import [backtype.storm LocalDRPC]) - (:import [backtype.storm.tuple Fields]) - (:import [backtype.storm.generated KillOptions]) - (:require [backtype.storm [testing :as t]]) - (:use [backtype.storm util]) - ) - -(defn local-drpc [] - (LocalDRPC.)) - -(defn exec-drpc [^LocalDRPC drpc function-name args] - (let [res (.execute drpc function-name args)] - (from-json res))) - -(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples] - (exec-drpc drpc function-name (to-json tuples))) - -(defn feeder-spout [fields] - (FeederBatchSpout. fields)) - -(defn feeder-committer-spout [fields] - (FeederCommitterBatchSpout. fields)) - -(defn feed [feeder tuples] - (.feed feeder tuples)) - -(defn fields [& fields] - (Fields. fields)) - -(defn memory-map-state [] - (MemoryMapState$Factory.)) - -(defmacro with-drpc [[drpc] & body] - `(let [~drpc (backtype.storm.LocalDRPC.)] - ~@body - (.shutdown ~drpc) - )) - -(defn with-topology* [cluster topo body-fn] - (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo)) - (body-fn) - (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))) - ) - -(defmacro with-topology [[cluster topo] & body] - `(with-topology* ~cluster ~topo (fn [] ~@body))) - -(defn bootstrap-imports [] - (import 'backtype.storm.LocalDRPC) - (import 'storm.trident.TridentTopology) - (import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet]) - ) - -(defn drpc-tuples-input [topology function-name drpc outfields] - (-> topology - (.newDRPCStream function-name drpc) - (.each (fields "args") (TuplifyArgs.) outfields) - )) - - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/genthrift.sh ---------------------------------------------------------------------- diff --git a/storm-core/src/genthrift.sh b/storm-core/src/genthrift.sh index 54cd10a..eeec78a 100644 --- a/storm-core/src/genthrift.sh +++ b/storm-core/src/genthrift.sh @@ -16,9 +16,9 @@ # limitations under the License. rm -rf gen-javabean gen-py py -rm -rf jvm/backtype/storm/generated +rm -rf jvm/org/apache/storm/generated thrift --gen java:beans,hashcode,nocamel,generated_annotations=undated --gen py:utf8strings storm.thrift -for file in gen-javabean/backtype/storm/generated/* ; do +for file in gen-javabean/org/apache/storm/generated/* ; do cat java_license_header.txt ${file} > ${file}.tmp mv -f ${file}.tmp ${file} done @@ -28,6 +28,6 @@ for file in gen-py/storm/* ; do cat py_license_header.txt ${file} > ${file}.tmp mv -f ${file}.tmp ${file} done -mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated +mv gen-javabean/org/apache/storm/generated jvm/org/apache/storm/generated mv gen-py py rm -rf gen-javabean
