Github user knusbaum commented on a diff in the pull request:
https://github.com/apache/storm/pull/1030#discussion_r50453900
--- Diff: storm-core/src/clj/org/apache/storm/config.clj ---
@@ -41,291 +38,18 @@
(dofor [f (seq (.getFields Config))]
(.get f nil)))
-
+;; TODO this function and its callings will be replace when nimbus and
supervisor move to Java
(defn cluster-mode
[conf & args]
(keyword (conf STORM-CLUSTER-MODE)))
-(defn local-mode?
- [conf]
- (let [mode (conf STORM-CLUSTER-MODE)]
- (condp = mode
- "local" true
- "distributed" false
- (throw (IllegalArgumentException.
- (str "Illegal cluster mode in conf: " mode))))))
-
(defn sampling-rate
[conf]
(->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
- (/ 1)
- int))
+ (/ 1)
+ int))
+;; TODO this function together with sampling-rate are to be replaced with
Java version when util.clj is in
(defn mk-stats-sampler
[conf]
- (even-sampler (sampling-rate conf)))
-
-(defn read-default-config
- []
- (clojurify-structure (Utils/readDefaultConfig)))
-
-(defn validate-configs-with-schemas
- [conf]
- (ConfigValidation/validateFields conf))
-
-(defn read-storm-config
- []
- (let [conf (clojurify-structure (Utils/readStormConfig))]
- (validate-configs-with-schemas conf)
- conf))
-
-(defn read-yaml-config
- ([name must-exist]
- (let [conf (clojurify-structure (Utils/findAndReadConfigFile name
must-exist))]
- (validate-configs-with-schemas conf)
- conf))
- ([name]
- (read-yaml-config true)))
-
-(defn absolute-storm-local-dir [conf]
- (let [storm-home (System/getProperty "storm.home")
- path (conf STORM-LOCAL-DIR)]
- (if path
- (if (is-absolute-path? path) path (str storm-home
file-path-separator path))
- (str storm-home file-path-separator "storm-local"))))
-
-(def LOG-DIR
- (.getCanonicalPath
- (clojure.java.io/file (or (System/getProperty "storm.log.dir")
- (get (read-storm-config) "storm.log.dir")
- (str (System/getProperty "storm.home")
file-path-separator "logs")))))
-
-(defn absolute-healthcheck-dir [conf]
- (let [storm-home (System/getProperty "storm.home")
- path (conf STORM-HEALTH-CHECK-DIR)]
- (if path
- (if (is-absolute-path? path) path (str storm-home
file-path-separator path))
- (str storm-home file-path-separator "healthchecks"))))
-
-(defn master-local-dir
- [conf]
- (let [ret (str (absolute-storm-local-dir conf) file-path-separator
"nimbus")]
- (FileUtils/forceMkdir (File. ret))
- ret))
-
-(defn master-stormjar-key
- [topology-id]
- (str topology-id "-stormjar.jar"))
-
-(defn master-stormcode-key
- [topology-id]
- (str topology-id "-stormcode.ser"))
-
-(defn master-stormconf-key
- [topology-id]
- (str topology-id "-stormconf.ser"))
-
-(defn master-stormdist-root
- ([conf]
- (str (master-local-dir conf) file-path-separator "stormdist"))
- ([conf storm-id]
- (str (master-stormdist-root conf) file-path-separator storm-id)))
-
-(defn master-tmp-dir
- [conf]
- (let [ret (str (master-local-dir conf) file-path-separator "tmp")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn read-supervisor-storm-conf-given-path
- [conf stormconf-path]
- (merge conf (clojurify-structure (Utils/fromCompressedJsonConf
(FileUtils/readFileToByteArray (File. stormconf-path))))))
-
-(defn master-storm-metafile-path [stormroot ]
- (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn master-stormjar-path
- [stormroot]
- (str stormroot file-path-separator "stormjar.jar"))
-
-(defn master-stormcode-path
- [stormroot]
- (str stormroot file-path-separator "stormcode.ser"))
-
-(defn master-stormconf-path
- [stormroot]
- (str stormroot file-path-separator "stormconf.ser"))
-
-(defn master-inbox
- [conf]
- (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn master-inimbus-dir
- [conf]
- (str (master-local-dir conf) file-path-separator "inimbus"))
-
-(defn supervisor-local-dir
- [conf]
- (let [ret (str (absolute-storm-local-dir conf) file-path-separator
"supervisor")]
- (FileUtils/forceMkdir (File. ret))
- ret))
-
-(defn supervisor-isupervisor-dir
- [conf]
- (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
-
-(defn supervisor-stormdist-root
- ([conf]
- (str (supervisor-local-dir conf) file-path-separator "stormdist"))
- ([conf storm-id]
- (str (supervisor-stormdist-root conf) file-path-separator (url-encode
storm-id))))
-
-(defn supervisor-stormjar-path [stormroot]
- (str stormroot file-path-separator "stormjar.jar"))
-
-(defn supervisor-storm-metafile-path [stormroot]
- (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn supervisor-stormcode-path
- [stormroot]
- (str stormroot file-path-separator "stormcode.ser"))
-
-(defn supervisor-stormconf-path
- [stormroot]
- (str stormroot file-path-separator "stormconf.ser"))
-
-(defn supervisor-tmp-dir
- [conf]
- (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn supervisor-storm-resources-path
- [stormroot]
- (str stormroot file-path-separator RESOURCES-SUBDIR))
-
-(defn ^LocalState supervisor-state
- [conf]
- (LocalState. (str (supervisor-local-dir conf) file-path-separator
"localstate")))
-
-(defn ^LocalState nimbus-topo-history-state
- [conf]
- (LocalState. (str (master-local-dir conf) file-path-separator
"history")))
-
-(defn read-supervisor-storm-conf
- [conf storm-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
- conf-path (supervisor-stormconf-path stormroot)]
- (read-supervisor-storm-conf-given-path conf conf-path)))
-
-(defn read-supervisor-topology
- [conf storm-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
- topology-path (supervisor-stormcode-path stormroot)]
- (Utils/deserialize (FileUtils/readFileToByteArray (File.
topology-path)) StormTopology)
- ))
-
-(defn worker-user-root [conf]
- (str (absolute-storm-local-dir conf) "/workers-users"))
-
-(defn worker-user-file [conf worker-id]
- (str (worker-user-root conf) "/" worker-id))
-
-(defn get-worker-user [conf worker-id]
- (log-message "GET worker-user " worker-id)
- (try
- (str/trim (slurp (worker-user-file conf worker-id)))
- (catch IOException e
- (log-warn-error e "Failed to get worker user for " worker-id ".")
- nil
- )))
-
-(defn get-id-from-blob-key
- [key]
- (if-let [groups (re-find
#"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
- (nth groups 1)))
-
-(defn set-worker-user! [conf worker-id user]
- (log-message "SET worker-user " worker-id " " user)
- (let [file (worker-user-file conf worker-id)]
- (.mkdirs (.getParentFile (File. file)))
- (spit (worker-user-file conf worker-id) user)))
-
-(defn remove-worker-user! [conf worker-id]
- (log-message "REMOVE worker-user " worker-id)
- (.delete (File. (worker-user-file conf worker-id))))
-
-(defn worker-artifacts-root
- ([conf]
- (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)]
- (if workers-artifacts-dir
- (if (is-absolute-path? workers-artifacts-dir)
- workers-artifacts-dir
- (str LOG-DIR file-path-separator workers-artifacts-dir))
- (str LOG-DIR file-path-separator "workers-artifacts"))))
- ([conf id]
- (str (worker-artifacts-root conf) file-path-separator id))
- ([conf id port]
- (str (worker-artifacts-root conf id) file-path-separator port)))
-
-(defn worker-artifacts-pid-path
- [conf id port]
- (str (worker-artifacts-root conf id port) file-path-separator
"worker.pid"))
-
-(defn get-log-metadata-file
- ([fname]
- (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
- (get-log-metadata-file (read-storm-config) id port)))
- ([conf id port]
- (clojure.java.io/file (str (worker-artifacts-root conf id)
file-path-separator port file-path-separator) "worker.yaml")))
-
-(defn get-worker-dir-from-root
- [log-root id port]
- (clojure.java.io/file (str log-root file-path-separator id
file-path-separator port)))
-
-(defn worker-root
- ([conf]
- (str (absolute-storm-local-dir conf) file-path-separator "workers"))
- ([conf id]
- (str (worker-root conf) file-path-separator id)))
-
-(defn worker-pids-root
- [conf id]
- (str (worker-root conf id) file-path-separator "pids"))
-
-(defn worker-pid-path
- [conf id pid]
- (str (worker-pids-root conf id) file-path-separator pid))
-
-(defn worker-heartbeats-root
- [conf id]
- (str (worker-root conf id) file-path-separator "heartbeats"))
-
-;; workers heartbeat here with pid and timestamp
-;; if supervisor stops receiving heartbeat, it kills and restarts the
process
-;; in local mode, keep a global map of ids to threads for simulating
process management
-(defn ^LocalState worker-state
- [conf id]
- (LocalState. (worker-heartbeats-root conf id)))
-
-(defn override-login-config-with-system-property [conf]
- (if-let [login_conf_file (System/getProperty
"java.security.auth.login.config")]
- (assoc conf "java.security.auth.login.config" login_conf_file)
- conf))
-
-(defn get-topo-logs-users
- [topology-conf]
- (sort (distinct (remove nil?
- (concat
- (topology-conf LOGS-USERS)
- (topology-conf TOPOLOGY-USERS))))))
-
-(defn get-topo-logs-groups
- [topology-conf]
- (sort (distinct (remove nil?
- (concat
- (topology-conf LOGS-GROUPS)
- (topology-conf TOPOLOGY-GROUPS))))))
-
+ (even-sampler (sampling-rate conf)))
--- End diff --
Please don't remove the newline from this file.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---