http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
new file mode 100644
index 0000000..39a9c12
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -0,0 +1,1199 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.logviewer
+ (:use compojure.core)
+ (:use [clojure.set :only [difference intersection]])
+ (:use [clojure.string :only [blank? split]])
+ (:use [hiccup core page-helpers form-helpers])
+ (:use [org.apache.storm config util log timer])
+ (:use [org.apache.storm.ui helpers])
+ (:import [org.apache.storm.utils Utils VersionInfo])
+ (:import [org.slf4j LoggerFactory])
+ (:import [java.util Arrays ArrayList HashSet])
+ (:import [java.util.zip GZIPInputStream])
+ (:import [org.apache.logging.log4j LogManager])
+ (:import [org.apache.logging.log4j.core Appender LoggerContext])
+ (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
+ (:import [java.io BufferedInputStream File FileFilter FileInputStream
+ InputStream InputStreamReader])
+ (:import [java.nio.file Files Path Paths DirectoryStream])
+ (:import [java.nio ByteBuffer])
+ (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.daemon DirectoryCleaner])
+ (:import [org.yaml.snakeyaml Yaml]
+ [org.yaml.snakeyaml.constructor SafeConstructor])
+ (:import [org.apache.storm.ui InvalidRequestException]
+ [org.apache.storm.security.auth AuthUtils])
+ (:require [org.apache.storm.daemon common [supervisor :as supervisor]])
+ (:require [compojure.route :as route]
+ [compojure.handler :as handler]
+ [ring.middleware.keyword-params]
+ [ring.util.codec :as codec]
+ [ring.util.response :as resp]
+ [clojure.string :as string])
+ (:require [metrics.meters :refer [defmeter mark!]])
+ (:use [org.apache.storm.daemon.common :only [start-metrics-reporters]])
+ (:gen-class))
+
+(def ^:dynamic *STORM-CONF* (read-storm-config))
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defmeter logviewer:num-log-page-http-requests)
+(defmeter logviewer:num-daemonlog-page-http-requests)
+(defmeter logviewer:num-download-log-file-http-requests)
+(defmeter logviewer:num-download-log-daemon-file-http-requests)
+(defmeter logviewer:num-list-logs-http-requests)
+
+(defn cleanup-cutoff-age-millis [conf now-millis]
+ (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
+
+(defn get-stream-for-dir
+ [^File f]
+ (try (Files/newDirectoryStream (.toPath f))
+ (catch Exception ex (log-error ex) nil)))
+
+(defn- last-modifiedtime-worker-logdir
+ "Return the last modified time for all log files in a worker's log dir.
+ Using stream rather than File.listFiles is to avoid large mem usage
+ when a directory has too many files"
+ [^File log-dir]
+ (let [^DirectoryStream stream (get-stream-for-dir log-dir)
+ dir-modified (.lastModified log-dir)
+ last-modified (try (reduce
+ (fn [maximum path]
+ (let [curr (.lastModified (.toFile path))]
+ (if (> curr maximum)
+ curr
+ maximum)))
+ dir-modified
+ stream)
+ (catch Exception ex
+ (log-error ex) dir-modified)
+ (finally
+ (if (instance? DirectoryStream stream)
+ (.close stream))))]
+ last-modified))
+
+(defn get-size-for-logdir
+ "Return the sum of lengths for all log files in a worker's log dir.
+ Using stream rather than File.listFiles is to avoid large mem usage
+ when a directory has too many files"
+ [log-dir]
+ (let [^DirectoryStream stream (get-stream-for-dir log-dir)]
+ (reduce
+ (fn [sum path]
+ (let [size (.length (.toFile path))]
+ (+ sum size)))
+ 0
+ stream)))
+
+(defn mk-FileFilter-for-log-cleanup [conf now-millis]
+ (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
+ (reify FileFilter (^boolean accept [this ^File file]
+ (boolean (and
+ (not (.isFile file))
+ (<= (last-modifiedtime-worker-logdir file)
cutoff-age-millis)))))))
+
+(defn select-dirs-for-cleanup [conf now-millis root-dir]
+ (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
+ (reduce clojure.set/union
+ (sorted-set)
+ (for [^File topo-dir (.listFiles (File. root-dir))]
+ (into [] (.listFiles topo-dir file-filter))))))
+
+(defn get-topo-port-workerlog
+ "Return the path of the worker log with the format of
topoId/port/worker.log.*"
+ [^File file]
+ (clojure.string/join file-path-separator
+ (take-last 3
+ (split (.getCanonicalPath file) (re-pattern
file-path-separator)))))
+
+(defn get-metadata-file-for-log-root-name [root-name root-dir]
+ (let [metaFile (clojure.java.io/file root-dir "metadata"
+ (str root-name ".yaml"))]
+ (if (.exists metaFile)
+ metaFile
+ (do
+ (log-warn "Could not find " (.getCanonicalPath metaFile)
+ " to clean up for " root-name)
+ nil))))
+
+(defn get-metadata-file-for-wroker-logdir [logdir]
+ (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
+ (if (.exists metaFile)
+ metaFile
+ (do
+ (log-warn "Could not find " (.getCanonicalPath metaFile)
+ " to clean up for " logdir)
+ nil))))
+
+(defn get-worker-id-from-metadata-file [metaFile]
+ (get (clojure-from-yaml-file metaFile) "worker-id"))
+
+(defn get-topo-owner-from-metadata-file [metaFile]
+ (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
+
+(defn identify-worker-log-dirs [log-dirs]
+ "return the workerid to worker-log-dir map"
+ (into {} (for [logdir log-dirs
+ :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
+ :when metaFile]
+ {(get-worker-id-from-metadata-file metaFile) logdir})))
+
+(defn get-alive-ids
+ [conf now-secs]
+ (->>
+ (supervisor/read-worker-heartbeats conf)
+ (remove
+ #(or (not (val %))
+ (supervisor/is-worker-hb-timed-out? now-secs
+ (val %)
+ conf)))
+ keys
+ set))
+
+(defn get-dead-worker-dirs
+ "Return a sorted set of java.io.Files that were written by workers that are
+ now dead"
+ [conf now-secs log-dirs]
+ (if (empty? log-dirs)
+ (sorted-set)
+ (let [alive-ids (get-alive-ids conf now-secs)
+ id->dir (identify-worker-log-dirs log-dirs)]
+ (apply sorted-set
+ (for [[id dir] id->dir
+ :when (not (contains? alive-ids id))]
+ dir)))))
+
+(defn get-all-worker-dirs [^File root-dir]
+ (reduce clojure.set/union
+ (sorted-set)
+ (for [^File topo-dir (.listFiles root-dir)]
+ (into [] (.listFiles topo-dir)))))
+
+(defn get-alive-worker-dirs
+ "Return a sorted set of java.io.Files that were written by workers that are
+ now active"
+ [conf root-dir]
+ (let [alive-ids (get-alive-ids conf (current-time-secs))
+ log-dirs (get-all-worker-dirs root-dir)
+ id->dir (identify-worker-log-dirs log-dirs)]
+ (apply sorted-set
+ (for [[id dir] id->dir
+ :when (contains? alive-ids id)]
+ (.getCanonicalPath dir)))))
+
+(defn get-all-logs-for-rootdir [^File log-dir]
+ (reduce concat
+ (for [port-dir (get-all-worker-dirs log-dir)]
+ (into [] (DirectoryCleaner/getFilesForDir port-dir)))))
+
+(defn is-active-log [^File file]
+ (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
+
+(defn sum-file-size
+ "Given a sequence of Files, sum their sizes."
+ [files]
+ (reduce #(+ %1 (.length %2)) 0 files))
+
+(defn per-workerdir-cleanup!
+ "Delete the oldest files in each overloaded worker log dir"
+ [^File root-dir size ^DirectoryCleaner cleaner]
+ (dofor [worker-dir (get-all-worker-dirs root-dir)]
+ (.deleteOldestWhileTooLarge cleaner (ArrayList. [worker-dir]) size true
nil)))
+
+(defn global-log-cleanup!
+ "Delete the oldest files in overloaded worker-artifacts globally"
+ [^File root-dir size ^DirectoryCleaner cleaner]
+ (let [worker-dirs (ArrayList. (get-all-worker-dirs root-dir))
+ alive-worker-dirs (HashSet. (get-alive-worker-dirs *STORM-CONF*
root-dir))]
+ (.deleteOldestWhileTooLarge cleaner worker-dirs size false
alive-worker-dirs)))
+
+(defn cleanup-empty-topodir!
+ "Delete the topo dir if it contains zero port dirs"
+ [^File dir]
+ (let [topodir (.getParentFile dir)]
+ (if (empty? (.listFiles topodir))
+ (rmr (.getCanonicalPath topodir)))))
+
+(defn cleanup-fn!
+ "Delete old log dirs for which the workers are no longer alive"
+ [log-root-dir]
+ (let [now-secs (current-time-secs)
+ old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
+ (* now-secs 1000)
+ log-root-dir)
+ total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
+ per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
+ per-dir-size (min per-dir-size (* total-size 0.5))
+ cleaner (DirectoryCleaner.)
+ dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
+ now-secs
+ old-log-dirs)]
+ (log-debug "log cleanup: now=" now-secs
+ " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
+ " dead worker dirs " (pr-str
+ (map #(.getName %) dead-worker-dirs)))
+ (dofor [dir dead-worker-dirs]
+ (let [path (.getCanonicalPath dir)]
+ (log-message "Cleaning up: Removing " path)
+ (try (rmr path)
+ (cleanup-empty-topodir! dir)
+ (catch Exception ex (log-error ex)))))
+ (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024
1024)) cleaner)
+ (let [size (* total-size (* 1024 1024))]
+ (global-log-cleanup! (File. log-root-dir) size cleaner))))
+
+(defn start-log-cleaner! [conf log-root-dir]
+ (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
+ (when interval-secs
+ (log-debug "starting log cleanup thread at interval: " interval-secs)
+ (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+ :kill-fn (fn [t]
+ (log-error t "Error when doing
logs cleanup")
+ (exit-process! 20 "Error when
doing log cleanup")))
+ 0 ;; Start immediately.
+ interval-secs
+ (fn [] (cleanup-fn! log-root-dir))))))
+
+(defn- skip-bytes
+ "FileInputStream#skip may not work the first time, so ensure it successfully
+ skips the given number of bytes."
+ [^InputStream stream n]
+ (loop [skipped 0]
+ (let [skipped (+ skipped (.skip stream (- n skipped)))]
+ (if (< skipped n) (recur skipped)))))
+
+(defn logfile-matches-filter?
+ [log-file-name]
+ (let [regex-string (str "worker.log.*")
+ regex-pattern (re-pattern regex-string)]
+ (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
+
+(defn page-file
+ ([path tail]
+ (let [zip-file? (.endsWith path ".gz")
+ flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path))
(.length (clojure.java.io/file path)))
+ skip (- flen tail)]
+ (page-file path skip tail)))
+ ([path start length]
+ (let [zip-file? (.endsWith path ".gz")
+ flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path))
(.length (clojure.java.io/file path)))]
+ (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream.
path)) (FileInputStream. path))
+ output (java.io.ByteArrayOutputStream.)]
+ (if (>= start flen)
+ (throw
+ (InvalidRequestException. "Cannot start past the end of the
file")))
+ (if (> start 0) (skip-bytes input start))
+ (let [buffer (make-array Byte/TYPE 1024)]
+ (loop []
+ (when (< (.size output) length)
+ (let [size (.read input buffer 0 (min 1024 (- length (.size
output))))]
+ (when (pos? size)
+ (.write output buffer 0 size)
+ (recur)))))
+ (.toString output))))))
+
+(defn get-log-user-group-whitelist [fname]
+ (let [wl-file (get-log-metadata-file fname)
+ m (clojure-from-yaml-file wl-file)]
+ (if (not-nil? m)
+ (do
+ (let [user-wl (.get m LOGS-USERS)
+ user-wl (if user-wl user-wl [])
+ group-wl (.get m LOGS-GROUPS)
+ group-wl (if group-wl group-wl [])]
+ [user-wl group-wl]))
+ nil)))
+
+(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin
*STORM-CONF*))
+(defn user-groups
+ [user]
+ (if (blank? user) [] (.getGroups igroup-mapper user)))
+
+(defn authorized-log-user? [user fname conf]
+ (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist
fname)))
+ nil
+ (let [groups (user-groups user)
+ [user-wl group-wl] (get-log-user-group-whitelist fname)
+ logs-users (concat (conf LOGS-USERS)
+ (conf NIMBUS-ADMINS)
+ user-wl)
+ logs-groups (concat (conf LOGS-GROUPS)
+ group-wl)]
+ (or (some #(= % user) logs-users)
+ (< 0 (.size (intersection (set groups) (set logs-groups))))))))
+
+(defn log-root-dir
+ "Given an appender name, as configured, get the parent directory of the
appender's log file.
+ Note that if anything goes wrong, this will throw an Error and exit."
+ [appender-name]
+ (let [appender (.getAppender (.getConfiguration (LogManager/getContext))
appender-name)]
+ (if (and appender-name appender (instance? RollingFileAppender appender))
+ (.getParent (File. (.getFileName appender)))
+ (throw
+ (RuntimeException. "Log viewer could not find configured appender, or
the appender is not a FileAppender. Please check that the appender name
configured in storm and log4j agree.")))))
+
+(defnk to-btn-link
+ "Create a link that is formatted like a button"
+ [url text :enabled true]
+ [:a {:href (java.net.URI. url)
+ :class (str "btn btn-default " (if enabled "enabled" "disabled"))}
text])
+
+(defn search-file-form [fname]
+ [[:form {:action "logviewer_search.html" :id "search-box"}
+ "Search this file:"
+ [:input {:type "text" :name "search"}]
+ [:input {:type "hidden" :name "file" :value fname}]
+ [:input {:type "submit" :value "Search"}]]])
+
+(defn log-file-selection-form [log-files type]
+ [[:form {:action type :id "list-of-files"}
+ (drop-down "file" log-files)
+ [:input {:type "submit" :value "Switch file"}]]])
+
+(defn pager-links [fname start length file-size]
+ (let [prev-start (max 0 (- start length))
+ next-start (if (> file-size 0)
+ (min (max 0 (- file-size length)) (+ start length))
+ (+ start length))]
+ [[:div
+ (concat
+ [(to-btn-link (url "/log"
+ {:file fname
+ :start (max 0 (- start length))
+ :length length})
+ "Prev" :enabled (< prev-start start))]
+ [(to-btn-link (url "/log"
+ {:file fname
+ :start 0
+ :length length}) "First")]
+ [(to-btn-link (url "/log"
+ {:file fname
+ :length length})
+ "Last")]
+ [(to-btn-link (url "/log"
+ {:file fname
+ :start (min (max 0 (- file-size length))
+ (+ start length))
+ :length length})
+ "Next" :enabled (> next-start start))])]]))
+
+(defn- download-link [fname]
+ [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
+
+(defn- daemon-download-link [fname]
+ [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full
File")]])
+
+(defn- is-txt-file [fname]
+ (re-find #"\.(log.*|txt|yaml|pid)$" fname))
+
+(def default-bytes-per-page 51200)
+
+(defn log-page [fname start length grep user root-dir]
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user fname *STORM-CONF*))
+ (let [file (.getCanonicalFile (File. root-dir fname))
+ path (.getCanonicalPath file)
+ zip-file? (.endsWith path ".gz")
+ topo-dir (.getParentFile (.getParentFile file))]
+ (if (and (.exists file)
+ (= (.getCanonicalFile (File. root-dir))
+ (.getParentFile topo-dir)))
+ (let [file-length (if zip-file? (Utils/zipFileSize
(clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+ log-files (reduce clojure.set/union
+ (sorted-set)
+ (for [^File port-dir (.listFiles topo-dir)]
+ (into [] (filter #(.isFile %)
(DirectoryCleaner/getFilesForDir port-dir))))) ;all types of files included
+ files-str (for [file log-files]
+ (get-topo-port-workerlog file))
+ reordered-files-str (conj (filter #(not= fname %) files-str)
fname)
+ length (if length
+ (min 10485760 length)
+ default-bytes-per-page)
+ log-string (escape-html
+ (if (is-txt-file fname)
+ (if start
+ (page-file path start length)
+ (page-file path length))
+ "This is a binary file and cannot display! You
may download the full file."))
+ start (or start (- file-length length))]
+ (if grep
+ (html [:pre#logContent
+ (if grep
+ (->> (.split log-string "\n")
+ (filter #(.contains % grep))
+ (string/join "\n"))
+ log-string)])
+ (let [pager-data (if (is-txt-file fname) (pager-links fname start
length file-length) nil)]
+ (html (concat (search-file-form fname)
+ (log-file-selection-form reordered-files-str
"log") ; list all files for this topology
+ pager-data
+ (download-link fname)
+ [[:pre#logContent log-string]]
+ pager-data)))))
+ (-> (resp/response "Page not found")
+ (resp/status 404))))
+ (if (nil? (get-log-user-group-whitelist fname))
+ (-> (resp/response "Page not found")
+ (resp/status 404))
+ (unauthorized-user-html user))))
+
+(defn daemonlog-page [fname start length grep user root-dir]
+ (let [file (.getCanonicalFile (File. root-dir fname))
+ file-length (.length file)
+ path (.getCanonicalPath file)
+ zip-file? (.endsWith path ".gz")]
+ (if (and (= (.getCanonicalFile (File. root-dir))
+ (.getParentFile file))
+ (.exists file))
+ (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file
path)) (.length (clojure.java.io/file path)))
+ length (if length
+ (min 10485760 length)
+ default-bytes-per-page)
+ log-files (into [] (filter #(.isFile %) (.listFiles (File.
root-dir)))) ;all types of files included
+ files-str (for [file log-files]
+ (.getName file))
+ reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+ log-string (escape-html
+ (if (is-txt-file fname)
+ (if start
+ (page-file path start length)
+ (page-file path length))
+ "This is a binary file and cannot display! You may
download the full file."))
+ start (or start (- file-length length))]
+ (if grep
+ (html [:pre#logContent
+ (if grep
+ (->> (.split log-string "\n")
+ (filter #(.contains % grep))
+ (string/join "\n"))
+ log-string)])
+ (let [pager-data (if (is-txt-file fname) (pager-links fname start
length file-length) nil)]
+ (html (concat (log-file-selection-form reordered-files-str
"daemonlog") ; list all daemon logs
+ pager-data
+ (daemon-download-link fname)
+ [[:pre#logContent log-string]]
+ pager-data)))))
+ (-> (resp/response "Page not found")
+ (resp/status 404)))))
+
+(defn download-log-file [fname req resp user ^String root-dir]
+ (let [file (.getCanonicalFile (File. root-dir fname))]
+ (if (.exists file)
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user fname *STORM-CONF*))
+ (-> (resp/response file)
+ (resp/content-type "application/octet-stream"))
+ (unauthorized-user-html user))
+ (-> (resp/response "Page not found")
+ (resp/status 404)))))
+
+(def grep-max-search-size 1024)
+(def grep-buf-size 2048)
+(def grep-context-size 128)
+
+(defn logviewer-port
+ []
+ (int (*STORM-CONF* LOGVIEWER-PORT)))
+
+(defn url-to-match-centered-in-log-page
+ [needle fname offset port]
+ (let [host (local-hostname)
+ port (logviewer-port)
+ fname (clojure.string/join file-path-separator (take-last 3 (split
fname (re-pattern file-path-separator))))]
+ (url (str "http://" host ":" port "/log")
+ {:file fname
+ :start (max 0
+ (- offset
+ (int (/ default-bytes-per-page 2))
+ (int (/ (alength needle) -2)))) ;; Addition
+ :length default-bytes-per-page})))
+
+(defnk mk-match-data
+ [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname
+ :before-bytes nil :after-bytes nil]
+ (let [url (url-to-match-centered-in-log-page needle
+ fname
+ file-offset
+ (*STORM-CONF* LOGVIEWER-PORT))
+ haystack-bytes (.array haystack)
+ before-string (if (>= haystack-offset grep-context-size)
+ (String. haystack-bytes
+ (- haystack-offset grep-context-size)
+ grep-context-size
+ "UTF-8")
+ (let [num-desired (max 0 (- grep-context-size
+ haystack-offset))
+ before-size (if before-bytes
+ (alength before-bytes)
+ 0)
+ num-expected (min before-size num-desired)]
+ (if (pos? num-expected)
+ (str (String. before-bytes
+ (- before-size num-expected)
+ num-expected
+ "UTF-8")
+ (String. haystack-bytes
+ 0
+ haystack-offset
+ "UTF-8"))
+ (String. haystack-bytes
+ 0
+ haystack-offset
+ "UTF-8"))))
+ after-string (let [needle-size (alength needle)
+ after-offset (+ haystack-offset needle-size)
+ haystack-size (.limit haystack)]
+ (if (< (+ after-offset grep-context-size) haystack-size)
+ (String. haystack-bytes
+ after-offset
+ grep-context-size
+ "UTF-8")
+ (let [num-desired (- grep-context-size
+ (- haystack-size after-offset))
+ after-size (if after-bytes
+ (alength after-bytes)
+ 0)
+ num-expected (min after-size num-desired)]
+ (if (pos? num-expected)
+ (str (String. haystack-bytes
+ after-offset
+ (- haystack-size after-offset)
+ "UTF-8")
+ (String. after-bytes 0 num-expected "UTF-8"))
+ (String. haystack-bytes
+ after-offset
+ (- haystack-size after-offset)
+ "UTF-8")))))]
+ {"byteOffset" file-offset
+ "beforeString" before-string
+ "afterString" after-string
+ "matchString" (String. needle "UTF-8")
+ "logviewerURL" url}))
+
+(defn- try-read-ahead!
+ "Tries once to read ahead in the stream to fill the context and resets the
+ stream to its position before the call."
+ [^BufferedInputStream stream haystack offset file-len bytes-read]
+ (let [num-expected (min (- file-len bytes-read)
+ grep-context-size)
+ after-bytes (byte-array num-expected)]
+ (.mark stream num-expected)
+ ;; Only try reading once.
+ (.read stream after-bytes 0 num-expected)
+ (.reset stream)
+ after-bytes))
+
+(defn offset-of-bytes
+ "Searches a given byte array for a match of a sub-array of bytes. Returns
+ the offset to the byte that matches, or -1 if no match was found."
+ [^bytes buf ^bytes value init-offset]
+ {:pre [(> (alength value) 0)
+ (not (neg? init-offset))]}
+ (loop [offset init-offset
+ candidate-offset init-offset
+ val-offset 0]
+ (if-not (pos? (- (alength value) val-offset))
+ ;; Found
+ candidate-offset
+ (if (>= offset (alength buf))
+ ;; We ran out of buffer for the search.
+ -1
+ (if (not= (aget value val-offset) (aget buf offset))
+ ;; The match at this candidate offset failed, so start over with the
+ ;; next candidate byte from the buffer.
+ (let [new-offset (inc candidate-offset)]
+ (recur new-offset new-offset 0))
+ ;; So far it matches. Keep going...
+ (recur (inc offset) candidate-offset (inc val-offset)))))))
+
+(defn- buffer-substring-search!
+ "As the file is read into a buffer, 1/2 the buffer's size at a time, we
+ search the buffer for matches of the substring and return a list of zero or
+ more matches."
+ [file file-len offset-to-buf init-buf-offset stream bytes-skipped
+ bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches
+ ^bytes before-bytes]
+ (loop [buf-offset init-buf-offset
+ matches initial-matches]
+ (let [offset (offset-of-bytes (.array haystack) needle buf-offset)]
+ (if (and (< (count matches) num-matches) (not (neg? offset)))
+ (let [file-offset (+ offset-to-buf offset)
+ bytes-needed-after-match (- (.limit haystack)
+ grep-context-size
+ (alength needle))
+ before-arg (if (< offset grep-context-size) before-bytes)
+ after-arg (if (> offset bytes-needed-after-match)
+ (try-read-ahead! stream
+ haystack
+ offset
+ file-len
+ bytes-read))]
+ (recur (+ offset (alength needle))
+ (conj matches
+ (mk-match-data needle
+ haystack
+ offset
+ file-offset
+ (.getCanonicalPath file)
+ :before-bytes before-arg
+ :after-bytes after-arg))))
+ (let [before-str-to-offset (min (.limit haystack)
+ grep-max-search-size)
+ before-str-from-offset (max 0 (- before-str-to-offset
+ grep-context-size))
+ new-before-bytes (Arrays/copyOfRange (.array haystack)
+ before-str-from-offset
+ before-str-to-offset)
+ ;; It's OK if new-byte-offset is negative. This is normal if
+ ;; we are out of bytes to read from a small file.
+ new-byte-offset (if (>= (count matches) num-matches)
+ (+ (get (last matches) "byteOffset")
+ (alength needle))
+ (+ bytes-skipped
+ bytes-read
+ (- grep-max-search-size)))]
+ [matches new-byte-offset new-before-bytes])))))
+
+(defn- mk-grep-response
+ "This response data only includes a next byte offset if there is more of the
+ file to read."
+ [search-bytes offset matches next-byte-offset]
+ (merge {"searchString" (String. search-bytes "UTF-8")
+ "startByteOffset" offset
+ "matches" matches}
+ (and next-byte-offset {"nextByteOffset" next-byte-offset})))
+
+(defn rotate-grep-buffer!
+ [^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len]
+ (let [buf-arr (.array buf)]
+ ;; Copy the 2nd half of the buffer to the first half.
+ (System/arraycopy buf-arr
+ grep-max-search-size
+ buf-arr
+ 0
+ grep-max-search-size)
+ ;; Zero-out the 2nd half to prevent accidental matches.
+ (Arrays/fill buf-arr
+ grep-max-search-size
+ (count buf-arr)
+ (byte 0))
+ ;; Fill the 2nd half with new bytes from the stream.
+ (let [bytes-read (.read stream
+ buf-arr
+ grep-max-search-size
+ (min file-len grep-max-search-size))]
+ (.limit buf (+ grep-max-search-size bytes-read))
+ (swap! total-bytes-read + bytes-read))))
+
+(defnk substring-search
+ "Searches for a substring in a log file, starting at the given offset,
+ returning the given number of matches, surrounded by the given number of
+ context lines. Other information is included to be useful for progressively
+ searching through a file for display in a UI. The search string must
+ grep-max-search-size bytes or fewer when decoded with UTF-8."
+ [file ^String search-string :num-matches 10 :start-byte-offset 0]
+ {:pre [(not (empty? search-string))
+ (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]}
+ (let [zip-file? (.endsWith (.getName file) ".gz")
+ f-input-steam (FileInputStream. file)
+ gzipped-input-stream (if zip-file?
+ (GZIPInputStream. f-input-steam)
+ f-input-steam)
+ stream ^BufferedInputStream (BufferedInputStream.
+ gzipped-input-stream)
+ file-len (if zip-file? (Utils/zipFileSize file) (.length file))
+ buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size)
+ buf-arr ^bytes (.array buf)
+ string nil
+ total-bytes-read (atom 0)
+ matches []
+ search-bytes ^bytes (.getBytes search-string "UTF-8")
+ num-matches (or num-matches 10)
+ start-byte-offset (or start-byte-offset 0)]
+ ;; Start at the part of the log file we are interested in.
+ ;; Allow searching when start-byte-offset == file-len so it doesn't blow
up on 0-length files
+ (if (> start-byte-offset file-len)
+ (throw
+ (InvalidRequestException. "Cannot search past the end of the file")))
+ (when (> start-byte-offset 0)
+ (skip-bytes stream start-byte-offset))
+ (java.util.Arrays/fill buf-arr (byte 0))
+ (let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))]
+ (.limit buf bytes-read)
+ (swap! total-bytes-read + bytes-read))
+ (loop [initial-matches []
+ init-buf-offset 0
+ byte-offset start-byte-offset
+ before-bytes nil]
+ (let [[matches new-byte-offset new-before-bytes]
+ (buffer-substring-search! file
+ file-len
+ byte-offset
+ init-buf-offset
+ stream
+ start-byte-offset
+ @total-bytes-read
+ buf
+ search-bytes
+ initial-matches
+ num-matches
+ before-bytes)]
+ (if (and (< (count matches) num-matches)
+ (< (+ @total-bytes-read start-byte-offset) file-len))
+ (let [;; The start index is positioned to find any possible
+ ;; occurrence search string that did not quite fit in the
+ ;; buffer on the previous read.
+ new-buf-offset (- (min (.limit ^ByteBuffer buf)
+ grep-max-search-size)
+ (alength search-bytes))]
+ (rotate-grep-buffer! buf stream total-bytes-read file file-len)
+ (when (< @total-bytes-read 0)
+ (throw (InvalidRequestException. "Cannot search past the end of
the file")))
+ (recur matches
+ new-buf-offset
+ new-byte-offset
+ new-before-bytes))
+ (mk-grep-response search-bytes
+ start-byte-offset
+ matches
+ (if-not (and (< (count matches) num-matches)
+ (>= @total-bytes-read file-len))
+ (let [next-byte-offset (+ (get (last matches)
+ "byteOffset")
+ (alength search-bytes))]
+ (if (> file-len next-byte-offset)
+ next-byte-offset)))))))))
+
+(defn- try-parse-int-param
+ [nam value]
+ (try
+ (Integer/parseInt value)
+ (catch java.lang.NumberFormatException e
+ (->
+ (str "Could not parse " nam " to an integer")
+ (InvalidRequestException. e)
+ throw))))
+
+(defn search-log-file
+ [fname user ^String root-dir search num-matches offset callback origin]
+ (let [file (.getCanonicalFile (File. root-dir fname))]
+ (if (.exists file)
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user fname *STORM-CONF*))
+ (let [num-matches-int (if num-matches
+ (try-parse-int-param "num-matches"
+ num-matches))
+ offset-int (if offset
+ (try-parse-int-param "start-byte-offset" offset))]
+ (try
+ (if (and (not (empty? search))
+ <= (count (.getBytes search "UTF-8")) grep-max-search-size)
+ (json-response
+ (substring-search file
+ search
+ :num-matches num-matches-int
+ :start-byte-offset offset-int)
+ callback
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})
+ (throw
+ (InvalidRequestException.
+ (str "Search substring must be between 1 and 1024 UTF-8 "
+ "bytes in size (inclusive)"))))
+ (catch Exception ex
+ (json-response (exception->json ex) callback :status 500))))
+ (json-response (unauthorized-user-json user) callback :status 401))
+ (json-response {"error" "Not Found"
+ "errorMessage" "The file was not found on this node."}
+ callback
+ :status 404))))
+
+(defn find-n-matches [logs n file-offset offset search]
+ (let [logs (drop file-offset logs)
+ wrap-matches-fn (fn [matches]
+ {"fileOffset" file-offset
+ "searchString" search
+ "matches" matches})]
+ (loop [matches []
+ logs logs
+ offset offset
+ file-offset file-offset
+ match-count 0]
+ (if (empty? logs)
+ (wrap-matches-fn matches)
+ (let [these-matches (try
+ (log-debug "Looking through " (first logs))
+ (substring-search (first logs)
+ search
+ :num-matches (- n match-count)
+ :start-byte-offset offset)
+ (catch InvalidRequestException e
+ (log-error e "Can't search past end of file.")
+ {}))
+ file-name (get-topo-port-workerlog (first logs))
+ new-matches (conj matches
+ (merge these-matches
+ { "fileName" file-name
+ "port" (first (take-last 2 (split
(.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
+ new-count (+ match-count (count (these-matches "matches")))]
+ (if (empty? these-matches)
+ (recur matches (rest logs) 0 (+ file-offset 1) match-count)
+ (if (>= new-count n)
+ (wrap-matches-fn new-matches)
+ (recur new-matches (rest logs) 0 (+ file-offset 1)
new-count))))))))
+
+(defn logs-for-port
+ "Get the filtered, authorized, sorted log files for a port."
+ [user port-dir]
+ (let [filter-authorized-fn (fn [user logs]
+ (filter #(or
+ (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user
(get-topo-port-workerlog %) *STORM-CONF*)) logs))]
+ (sort #(compare (.lastModified %2) (.lastModified %1))
+ (filter-authorized-fn
+ user
+ (filter #(re-find worker-log-filename-pattern (.getName %))
(DirectoryCleaner/getFilesForDir port-dir))))))
+
+(defn deep-search-logs-for-topology
+ [topology-id user ^String root-dir search num-matches port file-offset
offset search-archived? callback origin]
+ (json-response
+ (if (or (not search) (not (.exists (File. (str root-dir
file-path-separator topology-id)))))
+ []
+ (let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
+ offset (if offset (Integer/parseInt offset) 0)
+ num-matches (or (Integer/parseInt num-matches) 1)
+ port-dirs (vec (.listFiles (File. (str root-dir
file-path-separator topology-id))))
+ logs-for-port-fn (partial logs-for-port user)]
+ (if (or (not port) (= "*" port))
+ ;; Check for all ports
+ (let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn
port-dirs))]
+ (if search-archived?
+ (map #(find-n-matches % num-matches 0 0 search)
+ filtered-logs)
+ (map #(find-n-matches % num-matches 0 0 search)
+ (map (comp vector first) filtered-logs))))
+ ;; Check just the one port
+ (if (not (contains? (into #{} (map str (*STORM-CONF*
SUPERVISOR-SLOTS-PORTS))) port))
+ []
+ (let [port-dir (File. (str root-dir file-path-separator
topology-id file-path-separator port))]
+ (if (or (not (.exists port-dir)) (empty? (logs-for-port user
port-dir)))
+ []
+ (let [filtered-logs (logs-for-port user port-dir)]
+ (if search-archived?
+ (find-n-matches filtered-logs num-matches file-offset
offset search)
+ (find-n-matches [(first filtered-logs)] num-matches 0
offset search)))))))))
+ callback
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"}))
+
+(defn log-template
+ ([body] (log-template body nil nil))
+ ([body fname user]
+ (html4
+ [:head
+ [:title (str (escape-html fname) " - Storm Log Viewer")]
+ (include-css "/css/bootstrap-3.3.1.min.css")
+ (include-css "/css/jquery.dataTables.1.10.4.min.css")
+ (include-css "/css/style.css")
+ ]
+ [:body
+ (concat
+ (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
+ [[:div.ui-note [:p "Note: the drop-list shows at most 1024 files for
each worker directory."]]]
+ [[:h3 (escape-html fname)]]
+ (seq body))
+ ])))
+
+(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
+
+(defn- parse-long-from-map [m k]
+ (try
+ (Long/parseLong (k m))
+ (catch NumberFormatException ex
+ (throw (InvalidRequestException.
+ (str "Could not make an integer out of the query parameter '"
+ (name k) "'")
+ ex)))))
+
+(defn list-log-files
+ [user topoId port log-root callback origin]
+ (let [file-results
+ (if (nil? topoId)
+ (if (nil? port)
+ (get-all-logs-for-rootdir (File. log-root))
+ (reduce concat
+ (for [topo-dir (.listFiles (File. log-root))]
+ (reduce concat
+ (for [port-dir (.listFiles topo-dir)]
+ (if (= (str port) (.getName port-dir))
+ (into [] (DirectoryCleaner/getFilesForDir
port-dir))))))))
+ (if (nil? port)
+ (let [topo-dir (File. (str log-root file-path-separator topoId))]
+ (if (.exists topo-dir)
+ (reduce concat
+ (for [port-dir (.listFiles topo-dir)]
+ (into [] (DirectoryCleaner/getFilesForDir port-dir))))
+ []))
+ (let [port-dir (get-worker-dir-from-root log-root topoId port)]
+ (if (.exists port-dir)
+ (into [] (DirectoryCleaner/getFilesForDir port-dir))
+ []))))
+ file-strs (sort (for [file file-results]
+ (get-topo-port-workerlog file)))]
+ (json-response file-strs
+ callback
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})))
+
+(defn get-profiler-dump-files
+ [dir]
+ (filter (comp not nil?)
+ (for [f (DirectoryCleaner/getFilesForDir dir)]
+ (let [name (.getName f)]
+ (if (or
+ (.endsWith name ".txt")
+ (.endsWith name ".jfr")
+ (.endsWith name ".bin"))
+ (.getName f))))))
+
+(defroutes log-routes
+ (GET "/log" [:as req & m]
+ (try
+ (mark! logviewer:num-log-page-http-requests)
+ (let [servlet-request (:servlet-request req)
+ log-root (:log-root req)
+ user (.getUserName http-creds-handler servlet-request)
+ start (if (:start m) (parse-long-from-map m :start))
+ length (if (:length m) (parse-long-from-map m :length))
+ file (url-decode (:file m))]
+ (log-template (log-page file start length (:grep m) user log-root)
+ file user))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
+ (GET "/dumps/:topo-id/:host-port/:filename"
+ [:as {:keys [servlet-request servlet-response log-root]} topo-id
host-port filename &m]
+ (let [user (.getUserName http-creds-handler servlet-request)
+ port (second (split host-port #":"))
+ dir (File. (str log-root
+ file-path-separator
+ topo-id
+ file-path-separator
+ port))
+ file (File. (str log-root
+ file-path-separator
+ topo-id
+ file-path-separator
+ port
+ file-path-separator
+ filename))]
+ (if (and (.exists dir) (.exists file))
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user
+ (str topo-id file-path-separator port
file-path-separator "worker.log")
+ *STORM-CONF*))
+ (-> (resp/response file)
+ (resp/content-type "application/octet-stream"))
+ (unauthorized-user-html user))
+ (-> (resp/response "Page not found")
+ (resp/status 404)))))
+ (GET "/dumps/:topo-id/:host-port"
+ [:as {:keys [servlet-request servlet-response log-root]} topo-id
host-port &m]
+ (let [user (.getUserName http-creds-handler servlet-request)
+ port (second (split host-port #":"))
+ dir (File. (str log-root
+ file-path-separator
+ topo-id
+ file-path-separator
+ port))]
+ (if (.exists dir)
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user
+ (str topo-id file-path-separator port
file-path-separator "worker.log")
+ *STORM-CONF*))
+ (html4
+ [:head
+ [:title "File Dumps - Storm Log Viewer"]
+ (include-css "/css/bootstrap-3.3.1.min.css")
+ (include-css "/css/jquery.dataTables.1.10.4.min.css")
+ (include-css "/css/style.css")]
+ [:body
+ [:ul
+ (for [file (get-profiler-dump-files dir)]
+ [:li
+ [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)}
file ]])]])
+ (unauthorized-user-html user))
+ (-> (resp/response "Page not found")
+ (resp/status 404)))))
+ (GET "/daemonlog" [:as req & m]
+ (try
+ (mark! logviewer:num-daemonlog-page-http-requests)
+ (let [servlet-request (:servlet-request req)
+ daemonlog-root (:daemonlog-root req)
+ user (.getUserName http-creds-handler servlet-request)
+ start (if (:start m) (parse-long-from-map m :start))
+ length (if (:length m) (parse-long-from-map m :length))
+ file (url-decode (:file m))]
+ (log-template (daemonlog-page file start length (:grep m) user
daemonlog-root)
+ file user))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
+ (GET "/download/:file" [:as {:keys [servlet-request servlet-response
log-root]} file & m]
+ (try
+ (mark! logviewer:num-download-log-file-http-requests)
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (download-log-file file servlet-request servlet-response user
log-root))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
+ (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response
daemonlog-root]} file & m]
+ (try
+ (mark! logviewer:num-download-log-daemon-file-http-requests)
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (download-log-file file servlet-request servlet-response user
daemonlog-root))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
+ (GET "/search/:file" [:as {:keys [servlet-request servlet-response
log-root]} file & m]
+ ;; We do not use servlet-response here, but do not remove it from the
+ ;; :keys list, or this rule could stop working when an authentication
+ ;; filter is configured.
+ (try
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (search-log-file (url-decode file)
+ user
+ log-root
+ (:search-string m)
+ (:num-matches m)
+ (:start-byte-offset m)
+ (:callback m)
+ (.getHeader servlet-request "Origin")))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (json-response (exception->json ex) (:callback m) :status 400))))
+ (GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response
log-root]} topo-id & m]
+ ;; We do not use servlet-response here, but do not remove it from the
+ ;; :keys list, or this rule could stop working when an authentication
+ ;; filter is configured.
+ (try
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (deep-search-logs-for-topology topo-id
+ user
+ log-root
+ (:search-string m)
+ (:num-matches m)
+ (:port m)
+ (:start-file-offset m)
+ (:start-byte-offset m)
+ (:search-archived m)
+ (:callback m)
+ (.getHeader servlet-request "Origin")))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (json-response (exception->json ex) (:callback m) :status 400))))
+ (GET "/searchLogs" [:as req & m]
+ (try
+ (let [servlet-request (:servlet-request req)
+ user (.getUserName http-creds-handler servlet-request)]
+ (list-log-files user
+ (:topoId m)
+ (:port m)
+ (:log-root req)
+ (:callback m)
+ (.getHeader servlet-request "Origin")))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (json-response (exception->json ex) (:callback m) :status 400))))
+ (GET "/listLogs" [:as req & m]
+ (try
+ (mark! logviewer:num-list-logs-http-requests)
+ (let [servlet-request (:servlet-request req)
+ user (.getUserName http-creds-handler servlet-request)]
+ (list-log-files user
+ (:topoId m)
+ (:port m)
+ (:log-root req)
+ (:callback m)
+ (.getHeader servlet-request "Origin")))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (json-response (exception->json ex) (:callback m) :status 400))))
+ (route/resources "/")
+ (route/not-found "Page not found"))
+
+(defn conf-middleware
+ "For passing the storm configuration with each request."
+ [app log-root daemonlog-root]
+ (fn [req]
+ (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
+
+(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
+ (try
+ (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
+ filter-class (conf UI-FILTER)
+ filter-params (conf UI-FILTER-PARAMS)
+ logapp (handler/api (-> log-routes
+ requests-middleware)) ;; query params as map
+ middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
+ filters-confs (if (conf UI-FILTER)
+ [{:filter-class filter-class
+ :filter-params (or (conf UI-FILTER-PARAMS) {})}]
+ [])
+ filters-confs (concat filters-confs
+ [{:filter-class
"org.eclipse.jetty.servlets.GzipFilter"
+ :filter-name "Gzipper"
+ :filter-params {}}])
+ https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
+ keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
+ keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
+ keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
+ key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
+ truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
+ truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
+ truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
+ want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
+ need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
+ (storm-run-jetty {:port (int (conf LOGVIEWER-PORT))
+ :configurator (fn [server]
+ (config-ssl server
+ https-port
+ keystore-path
+ keystore-pass
+ keystore-type
+ key-password
+ truststore-path
+ truststore-password
+ truststore-type
+ want-client-auth
+ need-client-auth)
+ (config-filter server middle
filters-confs))}))
+ (catch Exception ex
+ (log-error ex))))
+
+(defn -main []
+ (let [conf (read-storm-config)
+ log-root (worker-artifacts-root conf)
+ daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
+ (setup-default-uncaught-exception-handler)
+ (start-log-cleaner! conf log-root)
+ (log-message "Starting logviewer server for storm version '"
+ STORM-VERSION
+ "'")
+ (start-logviewer! conf log-root daemonlog-root)
+ (start-metrics-reporters)))