http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj 
b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
new file mode 100644
index 0000000..39a9c12
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -0,0 +1,1199 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.logviewer
+  (:use compojure.core)
+  (:use [clojure.set :only [difference intersection]])
+  (:use [clojure.string :only [blank? split]])
+  (:use [hiccup core page-helpers form-helpers])
+  (:use [org.apache.storm config util log timer])
+  (:use [org.apache.storm.ui helpers])
+  (:import [org.apache.storm.utils Utils VersionInfo])
+  (:import [org.slf4j LoggerFactory])
+  (:import [java.util Arrays ArrayList HashSet])
+  (:import [java.util.zip GZIPInputStream])
+  (:import [org.apache.logging.log4j LogManager])
+  (:import [org.apache.logging.log4j.core Appender LoggerContext])
+  (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
+  (:import [java.io BufferedInputStream File FileFilter FileInputStream
+            InputStream InputStreamReader])
+  (:import [java.nio.file Files Path Paths DirectoryStream])
+  (:import [java.nio ByteBuffer])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.daemon DirectoryCleaner])
+  (:import [org.yaml.snakeyaml Yaml]
+           [org.yaml.snakeyaml.constructor SafeConstructor])
+  (:import [org.apache.storm.ui InvalidRequestException]
+           [org.apache.storm.security.auth AuthUtils])
+  (:require [org.apache.storm.daemon common [supervisor :as supervisor]])
+  (:require [compojure.route :as route]
+            [compojure.handler :as handler]
+            [ring.middleware.keyword-params]
+            [ring.util.codec :as codec]
+            [ring.util.response :as resp]
+            [clojure.string :as string])
+  (:require [metrics.meters :refer [defmeter mark!]])
+  (:use [org.apache.storm.daemon.common :only [start-metrics-reporters]])
+  (:gen-class))
+
+(def ^:dynamic *STORM-CONF* (read-storm-config))
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defmeter logviewer:num-log-page-http-requests)
+(defmeter logviewer:num-daemonlog-page-http-requests)
+(defmeter logviewer:num-download-log-file-http-requests)
+(defmeter logviewer:num-download-log-daemon-file-http-requests)
+(defmeter logviewer:num-list-logs-http-requests)
+
+(defn cleanup-cutoff-age-millis [conf now-millis]
+  (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
+
+(defn get-stream-for-dir
+  [^File f]
+  (try (Files/newDirectoryStream (.toPath f))
+    (catch Exception ex (log-error ex) nil)))
+
+(defn- last-modifiedtime-worker-logdir
+  "Return the last modified time for all log files in a worker's log dir.
+  Using stream rather than File.listFiles is to avoid large mem usage
+  when a directory has too many files"
+  [^File log-dir]
+  (let [^DirectoryStream stream (get-stream-for-dir log-dir)
+        dir-modified (.lastModified log-dir)
+        last-modified (try (reduce
+                        (fn [maximum path]
+                          (let [curr (.lastModified (.toFile path))]
+                            (if (> curr maximum)
+                              curr
+                              maximum)))
+                        dir-modified
+                        stream)
+                        (catch Exception ex
+                          (log-error ex) dir-modified)
+                        (finally
+                          (if (instance? DirectoryStream stream)
+                            (.close stream))))]
+    last-modified))
+
+(defn get-size-for-logdir
+  "Return the sum of lengths for all log files in a worker's log dir.
+   Using stream rather than File.listFiles is to avoid large mem usage
+   when a directory has too many files"
+  [log-dir]
+  (let [^DirectoryStream stream (get-stream-for-dir log-dir)]
+    (reduce
+      (fn [sum path]
+        (let [size (.length (.toFile path))]
+          (+ sum size)))
+      0
+      stream)))
+
+(defn mk-FileFilter-for-log-cleanup [conf now-millis]
+  (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
+    (reify FileFilter (^boolean accept [this ^File file]
+                        (boolean (and
+                                   (not (.isFile file))
+                                   (<= (last-modifiedtime-worker-logdir file) 
cutoff-age-millis)))))))
+
+(defn select-dirs-for-cleanup [conf now-millis root-dir]
+  (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
+    (reduce clojure.set/union
+            (sorted-set)
+            (for [^File topo-dir (.listFiles (File. root-dir))]
+              (into [] (.listFiles topo-dir file-filter))))))
+
+(defn get-topo-port-workerlog
+  "Return the path of the worker log with the format of 
topoId/port/worker.log.*"
+  [^File file]
+  (clojure.string/join file-path-separator
+                       (take-last 3
+                                  (split (.getCanonicalPath file) (re-pattern 
file-path-separator)))))
+
+(defn get-metadata-file-for-log-root-name [root-name root-dir]
+  (let [metaFile (clojure.java.io/file root-dir "metadata"
+                                       (str root-name ".yaml"))]
+    (if (.exists metaFile)
+      metaFile
+      (do
+        (log-warn "Could not find " (.getCanonicalPath metaFile)
+                  " to clean up for " root-name)
+        nil))))
+
+(defn get-metadata-file-for-wroker-logdir [logdir]
+  (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
+    (if (.exists metaFile)
+      metaFile
+      (do
+        (log-warn "Could not find " (.getCanonicalPath metaFile)
+                  " to clean up for " logdir)
+        nil))))
+
+(defn get-worker-id-from-metadata-file [metaFile]
+  (get (clojure-from-yaml-file metaFile) "worker-id"))
+
+(defn get-topo-owner-from-metadata-file [metaFile]
+  (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
+
+(defn identify-worker-log-dirs [log-dirs]
+  "return the workerid to worker-log-dir map"
+  (into {} (for [logdir log-dirs
+                 :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
+                 :when metaFile]
+             {(get-worker-id-from-metadata-file metaFile) logdir})))
+
+(defn get-alive-ids
+  [conf now-secs]
+  (->>
+    (supervisor/read-worker-heartbeats conf)
+    (remove
+      #(or (not (val %))
+           (supervisor/is-worker-hb-timed-out? now-secs
+                                               (val %)
+                                               conf)))
+    keys
+    set))
+
+(defn get-dead-worker-dirs
+  "Return a sorted set of java.io.Files that were written by workers that are
+  now dead"
+  [conf now-secs log-dirs]
+  (if (empty? log-dirs)
+    (sorted-set)
+    (let [alive-ids (get-alive-ids conf now-secs)
+          id->dir (identify-worker-log-dirs log-dirs)]
+      (apply sorted-set
+             (for [[id dir] id->dir
+                   :when (not (contains? alive-ids id))]
+               dir)))))
+
+(defn get-all-worker-dirs [^File root-dir]
+  (reduce clojure.set/union
+          (sorted-set)
+          (for [^File topo-dir (.listFiles root-dir)]
+            (into [] (.listFiles topo-dir)))))
+
+(defn get-alive-worker-dirs
+  "Return a sorted set of java.io.Files that were written by workers that are
+  now active"
+  [conf root-dir]
+  (let [alive-ids (get-alive-ids conf (current-time-secs))
+        log-dirs (get-all-worker-dirs root-dir)
+        id->dir (identify-worker-log-dirs log-dirs)]
+    (apply sorted-set
+           (for [[id dir] id->dir
+                 :when (contains? alive-ids id)]
+             (.getCanonicalPath dir)))))
+
+(defn get-all-logs-for-rootdir [^File log-dir]
+  (reduce concat
+          (for [port-dir (get-all-worker-dirs log-dir)]
+            (into [] (DirectoryCleaner/getFilesForDir port-dir)))))
+
+(defn is-active-log [^File file]
+  (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
+
+(defn sum-file-size
+  "Given a sequence of Files, sum their sizes."
+  [files]
+  (reduce #(+ %1 (.length %2)) 0 files))
+
+(defn per-workerdir-cleanup!
+  "Delete the oldest files in each overloaded worker log dir"
+  [^File root-dir size ^DirectoryCleaner cleaner]
+  (dofor [worker-dir (get-all-worker-dirs root-dir)]
+    (.deleteOldestWhileTooLarge cleaner (ArrayList. [worker-dir]) size true 
nil)))
+
+(defn global-log-cleanup!
+  "Delete the oldest files in overloaded worker-artifacts globally"
+  [^File root-dir size ^DirectoryCleaner cleaner]
+  (let [worker-dirs (ArrayList. (get-all-worker-dirs root-dir))
+        alive-worker-dirs (HashSet. (get-alive-worker-dirs *STORM-CONF* 
root-dir))]
+    (.deleteOldestWhileTooLarge cleaner worker-dirs size false 
alive-worker-dirs)))
+
+(defn cleanup-empty-topodir!
+  "Delete the topo dir if it contains zero port dirs"
+  [^File dir]
+  (let [topodir (.getParentFile dir)]
+    (if (empty? (.listFiles topodir))
+      (rmr (.getCanonicalPath topodir)))))
+
+(defn cleanup-fn!
+  "Delete old log dirs for which the workers are no longer alive"
+  [log-root-dir]
+  (let [now-secs (current-time-secs)
+        old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
+                                              (* now-secs 1000)
+                                              log-root-dir)
+        total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
+        per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
+        per-dir-size (min per-dir-size (* total-size 0.5))
+        cleaner (DirectoryCleaner.)
+        dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
+                                               now-secs
+                                               old-log-dirs)]
+    (log-debug "log cleanup: now=" now-secs
+               " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
+               " dead worker dirs " (pr-str
+                                       (map #(.getName %) dead-worker-dirs)))
+    (dofor [dir dead-worker-dirs]
+           (let [path (.getCanonicalPath dir)]
+             (log-message "Cleaning up: Removing " path)
+             (try (rmr path)
+                  (cleanup-empty-topodir! dir)
+                  (catch Exception ex (log-error ex)))))
+    (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 
1024)) cleaner)
+    (let [size (* total-size (* 1024 1024))]
+      (global-log-cleanup! (File. log-root-dir) size cleaner))))
+
+(defn start-log-cleaner! [conf log-root-dir]
+  (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
+    (when interval-secs
+      (log-debug "starting log cleanup thread at interval: " interval-secs)
+      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+                                    :kill-fn (fn [t]
+                                               (log-error t "Error when doing 
logs cleanup")
+                                               (exit-process! 20 "Error when 
doing log cleanup")))
+                          0 ;; Start immediately.
+                          interval-secs
+                          (fn [] (cleanup-fn! log-root-dir))))))
+
+(defn- skip-bytes
+  "FileInputStream#skip may not work the first time, so ensure it successfully
+  skips the given number of bytes."
+  [^InputStream stream n]
+  (loop [skipped 0]
+    (let [skipped (+ skipped (.skip stream (- n skipped)))]
+      (if (< skipped n) (recur skipped)))))
+
+(defn logfile-matches-filter?
+  [log-file-name]
+  (let [regex-string (str "worker.log.*")
+        regex-pattern (re-pattern regex-string)]
+    (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
+
+(defn page-file
+  ([path tail]
+    (let [zip-file? (.endsWith path ".gz")
+          flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) 
(.length (clojure.java.io/file path)))
+          skip (- flen tail)]
+      (page-file path skip tail)))
+  ([path start length]
+    (let [zip-file? (.endsWith path ".gz")
+          flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) 
(.length (clojure.java.io/file path)))]
+      (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. 
path)) (FileInputStream. path))
+                  output (java.io.ByteArrayOutputStream.)]
+        (if (>= start flen)
+          (throw
+            (InvalidRequestException. "Cannot start past the end of the 
file")))
+        (if (> start 0) (skip-bytes input start))
+        (let [buffer (make-array Byte/TYPE 1024)]
+          (loop []
+            (when (< (.size output) length)
+              (let [size (.read input buffer 0 (min 1024 (- length (.size 
output))))]
+                (when (pos? size)
+                  (.write output buffer 0 size)
+                  (recur)))))
+        (.toString output))))))
+
+(defn get-log-user-group-whitelist [fname]
+  (let [wl-file (get-log-metadata-file fname)
+        m (clojure-from-yaml-file wl-file)]
+    (if (not-nil? m)
+      (do
+        (let [user-wl (.get m LOGS-USERS)
+              user-wl (if user-wl user-wl [])
+              group-wl (.get m LOGS-GROUPS)
+              group-wl (if group-wl group-wl [])]
+          [user-wl group-wl]))
+        nil)))
+
+(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin 
*STORM-CONF*))
+(defn user-groups
+  [user]
+  (if (blank? user) [] (.getGroups igroup-mapper user)))
+
+(defn authorized-log-user? [user fname conf]
+  (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist 
fname)))
+    nil
+    (let [groups (user-groups user)
+          [user-wl group-wl] (get-log-user-group-whitelist fname)
+          logs-users (concat (conf LOGS-USERS)
+                             (conf NIMBUS-ADMINS)
+                             user-wl)
+          logs-groups (concat (conf LOGS-GROUPS)
+                              group-wl)]
+       (or (some #(= % user) logs-users)
+           (< 0 (.size (intersection (set groups) (set logs-groups))))))))
+
+(defn log-root-dir
+  "Given an appender name, as configured, get the parent directory of the 
appender's log file.
+   Note that if anything goes wrong, this will throw an Error and exit."
+  [appender-name]
+  (let [appender (.getAppender (.getConfiguration (LogManager/getContext)) 
appender-name)]
+    (if (and appender-name appender (instance? RollingFileAppender appender))
+      (.getParent (File. (.getFileName appender)))
+      (throw
+       (RuntimeException. "Log viewer could not find configured appender, or 
the appender is not a FileAppender. Please check that the appender name 
configured in storm and log4j agree.")))))
+
+(defnk to-btn-link
+  "Create a link that is formatted like a button"
+  [url text :enabled true]
+  [:a {:href (java.net.URI. url)
+       :class (str "btn btn-default " (if enabled "enabled" "disabled"))} 
text])
+
+(defn search-file-form [fname]
+  [[:form {:action "logviewer_search.html" :id "search-box"}
+    "Search this file:"
+    [:input {:type "text" :name "search"}]
+    [:input {:type "hidden" :name "file" :value fname}]
+    [:input {:type "submit" :value "Search"}]]])
+
+(defn log-file-selection-form [log-files type]
+  [[:form {:action type :id "list-of-files"}
+    (drop-down "file" log-files)
+    [:input {:type "submit" :value "Switch file"}]]])
+
+(defn pager-links [fname start length file-size]
+  (let [prev-start (max 0 (- start length))
+        next-start (if (> file-size 0)
+                     (min (max 0 (- file-size length)) (+ start length))
+                     (+ start length))]
+    [[:div
+      (concat
+          [(to-btn-link (url "/log"
+                          {:file fname
+                           :start (max 0 (- start length))
+                           :length length})
+                          "Prev" :enabled (< prev-start start))]
+          [(to-btn-link (url "/log"
+                           {:file fname
+                            :start 0
+                            :length length}) "First")]
+          [(to-btn-link (url "/log"
+                           {:file fname
+                            :length length})
+                        "Last")]
+          [(to-btn-link (url "/log"
+                          {:file fname
+                           :start (min (max 0 (- file-size length))
+                                       (+ start length))
+                           :length length})
+                        "Next" :enabled (> next-start start))])]]))
+
+(defn- download-link [fname]
+  [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
+
+(defn- daemon-download-link [fname]
+  [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full 
File")]])
+
+(defn- is-txt-file [fname]
+  (re-find #"\.(log.*|txt|yaml|pid)$" fname))
+
+(def default-bytes-per-page 51200)
+
+(defn log-page [fname start length grep user root-dir]
+  (if (or (blank? (*STORM-CONF* UI-FILTER))
+          (authorized-log-user? user fname *STORM-CONF*))
+    (let [file (.getCanonicalFile (File. root-dir fname))
+          path (.getCanonicalPath file)
+          zip-file? (.endsWith path ".gz")
+          topo-dir (.getParentFile (.getParentFile file))]
+      (if (and (.exists file)
+               (= (.getCanonicalFile (File. root-dir))
+                  (.getParentFile topo-dir)))
+        (let [file-length (if zip-file? (Utils/zipFileSize 
(clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+              log-files (reduce clojure.set/union
+                          (sorted-set)
+                          (for [^File port-dir (.listFiles topo-dir)]
+                            (into [] (filter #(.isFile %) 
(DirectoryCleaner/getFilesForDir port-dir))))) ;all types of files included
+              files-str (for [file log-files]
+                          (get-topo-port-workerlog file))
+              reordered-files-str (conj (filter #(not= fname %) files-str) 
fname)
+               length (if length
+                       (min 10485760 length)
+                       default-bytes-per-page)
+              log-string (escape-html
+                           (if (is-txt-file fname)
+                             (if start
+                               (page-file path start length)
+                               (page-file path length))
+                             "This is a binary file and cannot display! You 
may download the full file."))
+              start (or start (- file-length length))]
+          (if grep
+            (html [:pre#logContent
+                   (if grep
+                     (->> (.split log-string "\n")
+                          (filter #(.contains % grep))
+                          (string/join "\n"))
+                     log-string)])
+            (let [pager-data (if (is-txt-file fname) (pager-links fname start 
length file-length) nil)]
+              (html (concat (search-file-form fname)
+                            (log-file-selection-form reordered-files-str 
"log") ; list all files for this topology
+                            pager-data
+                            (download-link fname)
+                            [[:pre#logContent log-string]]
+                            pager-data)))))
+        (-> (resp/response "Page not found")
+            (resp/status 404))))
+    (if (nil? (get-log-user-group-whitelist fname))
+      (-> (resp/response "Page not found")
+        (resp/status 404))
+      (unauthorized-user-html user))))
+
+(defn daemonlog-page [fname start length grep user root-dir]
+  (let [file (.getCanonicalFile (File. root-dir fname))
+        file-length (.length file)
+        path (.getCanonicalPath file)
+        zip-file? (.endsWith path ".gz")]
+    (if (and (= (.getCanonicalFile (File. root-dir))
+                (.getParentFile file))
+             (.exists file))
+      (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file 
path)) (.length (clojure.java.io/file path)))
+            length (if length
+                     (min 10485760 length)
+                     default-bytes-per-page)
+            log-files (into [] (filter #(.isFile %) (.listFiles (File. 
root-dir)))) ;all types of files included
+            files-str (for [file log-files]
+                        (.getName file))
+            reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+            log-string (escape-html
+                         (if (is-txt-file fname)
+                           (if start
+                             (page-file path start length)
+                             (page-file path length))
+                           "This is a binary file and cannot display! You may 
download the full file."))
+            start (or start (- file-length length))]
+        (if grep
+          (html [:pre#logContent
+                 (if grep
+                   (->> (.split log-string "\n")
+                        (filter #(.contains % grep))
+                        (string/join "\n"))
+                   log-string)])
+          (let [pager-data (if (is-txt-file fname) (pager-links fname start 
length file-length) nil)]
+            (html (concat (log-file-selection-form reordered-files-str 
"daemonlog") ; list all daemon logs
+                          pager-data
+                          (daemon-download-link fname)
+                          [[:pre#logContent log-string]]
+                          pager-data)))))
+      (-> (resp/response "Page not found")
+          (resp/status 404)))))
+
+(defn download-log-file [fname req resp user ^String root-dir]
+  (let [file (.getCanonicalFile (File. root-dir fname))]
+    (if (.exists file)
+      (if (or (blank? (*STORM-CONF* UI-FILTER))
+              (authorized-log-user? user fname *STORM-CONF*))
+        (-> (resp/response file)
+            (resp/content-type "application/octet-stream"))
+        (unauthorized-user-html user))
+      (-> (resp/response "Page not found")
+          (resp/status 404)))))
+
+(def grep-max-search-size 1024)
+(def grep-buf-size 2048)
+(def grep-context-size 128)
+
+(defn logviewer-port
+  []
+  (int (*STORM-CONF* LOGVIEWER-PORT)))
+
+(defn url-to-match-centered-in-log-page
+  [needle fname offset port]
+  (let [host (local-hostname)
+        port (logviewer-port)
+        fname (clojure.string/join file-path-separator (take-last 3 (split 
fname (re-pattern file-path-separator))))]
+    (url (str "http://"; host ":" port "/log")
+      {:file fname
+       :start (max 0
+                (- offset
+                  (int (/ default-bytes-per-page 2))
+                  (int (/ (alength needle) -2)))) ;; Addition
+       :length default-bytes-per-page})))
+
+(defnk mk-match-data
+  [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname
+   :before-bytes nil :after-bytes nil]
+  (let [url (url-to-match-centered-in-log-page needle
+              fname
+              file-offset
+              (*STORM-CONF* LOGVIEWER-PORT))
+        haystack-bytes (.array haystack)
+        before-string (if (>= haystack-offset grep-context-size)
+                        (String. haystack-bytes
+                          (- haystack-offset grep-context-size)
+                          grep-context-size
+                          "UTF-8")
+                        (let [num-desired (max 0 (- grep-context-size
+                                                   haystack-offset))
+                              before-size (if before-bytes
+                                            (alength before-bytes)
+                                            0)
+                              num-expected (min before-size num-desired)]
+                          (if (pos? num-expected)
+                            (str (String. before-bytes
+                                   (- before-size num-expected)
+                                   num-expected
+                                   "UTF-8")
+                              (String. haystack-bytes
+                                0
+                                haystack-offset
+                                "UTF-8"))
+                            (String. haystack-bytes
+                              0
+                              haystack-offset
+                              "UTF-8"))))
+        after-string (let [needle-size (alength needle)
+                           after-offset (+ haystack-offset needle-size)
+                           haystack-size (.limit haystack)]
+                       (if (< (+ after-offset grep-context-size) haystack-size)
+                         (String. haystack-bytes
+                           after-offset
+                           grep-context-size
+                           "UTF-8")
+                         (let [num-desired (- grep-context-size
+                                             (- haystack-size after-offset))
+                               after-size (if after-bytes
+                                            (alength after-bytes)
+                                            0)
+                               num-expected (min after-size num-desired)]
+                           (if (pos? num-expected)
+                             (str (String. haystack-bytes
+                                    after-offset
+                                    (- haystack-size after-offset)
+                                    "UTF-8")
+                               (String. after-bytes 0 num-expected "UTF-8"))
+                             (String. haystack-bytes
+                               after-offset
+                               (- haystack-size after-offset)
+                               "UTF-8")))))]
+    {"byteOffset" file-offset
+     "beforeString" before-string
+     "afterString" after-string
+     "matchString" (String. needle "UTF-8")
+     "logviewerURL" url}))
+
+(defn- try-read-ahead!
+  "Tries once to read ahead in the stream to fill the context and resets the
+  stream to its position before the call."
+  [^BufferedInputStream stream haystack offset file-len bytes-read]
+  (let [num-expected (min (- file-len bytes-read)
+                       grep-context-size)
+        after-bytes (byte-array num-expected)]
+    (.mark stream num-expected)
+    ;; Only try reading once.
+    (.read stream after-bytes 0 num-expected)
+    (.reset stream)
+    after-bytes))
+
+(defn offset-of-bytes
+  "Searches a given byte array for a match of a sub-array of bytes.  Returns
+  the offset to the byte that matches, or -1 if no match was found."
+  [^bytes buf ^bytes value init-offset]
+  {:pre [(> (alength value) 0)
+         (not (neg? init-offset))]}
+  (loop [offset init-offset
+         candidate-offset init-offset
+         val-offset 0]
+    (if-not (pos? (- (alength value) val-offset))
+      ;; Found
+      candidate-offset
+      (if (>= offset (alength buf))
+        ;; We ran out of buffer for the search.
+        -1
+        (if (not= (aget value val-offset) (aget buf offset))
+          ;; The match at this candidate offset failed, so start over with the
+          ;; next candidate byte from the buffer.
+          (let [new-offset (inc candidate-offset)]
+            (recur new-offset new-offset 0))
+          ;; So far it matches.  Keep going...
+          (recur (inc offset) candidate-offset (inc val-offset)))))))
+
+(defn- buffer-substring-search!
+  "As the file is read into a buffer, 1/2 the buffer's size at a time, we
+  search the buffer for matches of the substring and return a list of zero or
+  more matches."
+  [file file-len offset-to-buf init-buf-offset stream bytes-skipped
+   bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches
+   ^bytes before-bytes]
+  (loop [buf-offset init-buf-offset
+         matches initial-matches]
+    (let [offset (offset-of-bytes (.array haystack) needle buf-offset)]
+      (if (and (< (count matches) num-matches) (not (neg? offset)))
+        (let [file-offset (+ offset-to-buf offset)
+              bytes-needed-after-match (- (.limit haystack)
+                                         grep-context-size
+                                         (alength needle))
+              before-arg (if (< offset grep-context-size) before-bytes)
+              after-arg (if (> offset bytes-needed-after-match)
+                          (try-read-ahead! stream
+                            haystack
+                            offset
+                            file-len
+                            bytes-read))]
+          (recur (+ offset (alength needle))
+            (conj matches
+              (mk-match-data needle
+                haystack
+                offset
+                file-offset
+                (.getCanonicalPath file)
+                :before-bytes before-arg
+                :after-bytes after-arg))))
+        (let [before-str-to-offset (min (.limit haystack)
+                                     grep-max-search-size)
+              before-str-from-offset (max 0 (- before-str-to-offset
+                                              grep-context-size))
+              new-before-bytes (Arrays/copyOfRange (.array haystack)
+                                 before-str-from-offset
+                                 before-str-to-offset)
+              ;; It's OK if new-byte-offset is negative.  This is normal if
+              ;; we are out of bytes to read from a small file.
+              new-byte-offset (if (>= (count matches) num-matches)
+                                (+ (get (last matches) "byteOffset")
+                                  (alength needle))
+                                (+ bytes-skipped
+                                  bytes-read
+                                  (- grep-max-search-size)))]
+          [matches new-byte-offset new-before-bytes])))))
+
+(defn- mk-grep-response
+  "This response data only includes a next byte offset if there is more of the
+  file to read."
+  [search-bytes offset matches next-byte-offset]
+  (merge {"searchString" (String. search-bytes "UTF-8")
+          "startByteOffset" offset
+          "matches" matches}
+    (and next-byte-offset {"nextByteOffset" next-byte-offset})))
+
+(defn rotate-grep-buffer!
+  [^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len]
+  (let [buf-arr (.array buf)]
+    ;; Copy the 2nd half of the buffer to the first half.
+    (System/arraycopy buf-arr
+      grep-max-search-size
+      buf-arr
+      0
+      grep-max-search-size)
+    ;; Zero-out the 2nd half to prevent accidental matches.
+    (Arrays/fill buf-arr
+      grep-max-search-size
+      (count buf-arr)
+      (byte 0))
+    ;; Fill the 2nd half with new bytes from the stream.
+    (let [bytes-read (.read stream
+                       buf-arr
+                       grep-max-search-size
+                       (min file-len grep-max-search-size))]
+      (.limit buf (+ grep-max-search-size bytes-read))
+      (swap! total-bytes-read + bytes-read))))
+
+(defnk substring-search
+  "Searches for a substring in a log file, starting at the given offset,
+  returning the given number of matches, surrounded by the given number of
+  context lines.  Other information is included to be useful for progressively
+  searching through a file for display in a UI. The search string must
+  grep-max-search-size bytes or fewer when decoded with UTF-8."
+  [file ^String search-string :num-matches 10 :start-byte-offset 0]
+  {:pre [(not (empty? search-string))
+         (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]}
+  (let [zip-file? (.endsWith (.getName file) ".gz")
+        f-input-steam (FileInputStream. file)
+        gzipped-input-stream (if zip-file?
+                               (GZIPInputStream. f-input-steam)
+                               f-input-steam)
+        stream ^BufferedInputStream (BufferedInputStream.
+                                      gzipped-input-stream)
+        file-len (if zip-file? (Utils/zipFileSize file) (.length file))
+        buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size)
+        buf-arr ^bytes (.array buf)
+        string nil
+        total-bytes-read (atom 0)
+        matches []
+        search-bytes ^bytes (.getBytes search-string "UTF-8")
+        num-matches (or num-matches 10)
+        start-byte-offset (or start-byte-offset 0)]
+    ;; Start at the part of the log file we are interested in.
+    ;; Allow searching when start-byte-offset == file-len so it doesn't blow 
up on 0-length files
+    (if (> start-byte-offset file-len)
+      (throw
+        (InvalidRequestException. "Cannot search past the end of the file")))
+    (when (> start-byte-offset 0)
+      (skip-bytes stream start-byte-offset))
+    (java.util.Arrays/fill buf-arr (byte 0))
+    (let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))]
+      (.limit buf bytes-read)
+      (swap! total-bytes-read + bytes-read))
+    (loop [initial-matches []
+           init-buf-offset 0
+           byte-offset start-byte-offset
+           before-bytes nil]
+      (let [[matches new-byte-offset new-before-bytes]
+            (buffer-substring-search! file
+              file-len
+              byte-offset
+              init-buf-offset
+              stream
+              start-byte-offset
+              @total-bytes-read
+              buf
+              search-bytes
+              initial-matches
+              num-matches
+              before-bytes)]
+        (if (and (< (count matches) num-matches)
+              (< (+ @total-bytes-read start-byte-offset) file-len))
+          (let [;; The start index is positioned to find any possible
+                ;; occurrence search string that did not quite fit in the
+                ;; buffer on the previous read.
+                new-buf-offset (- (min (.limit ^ByteBuffer buf)
+                                    grep-max-search-size)
+                                 (alength search-bytes))]
+            (rotate-grep-buffer! buf stream total-bytes-read file file-len)
+            (when (< @total-bytes-read 0)
+              (throw (InvalidRequestException. "Cannot search past the end of 
the file")))
+            (recur matches
+              new-buf-offset
+              new-byte-offset
+              new-before-bytes))
+          (mk-grep-response search-bytes
+            start-byte-offset
+            matches
+            (if-not (and (< (count matches) num-matches)
+                      (>= @total-bytes-read file-len))
+              (let [next-byte-offset (+ (get (last matches)
+                                          "byteOffset")
+                                       (alength search-bytes))]
+                (if (> file-len next-byte-offset)
+                  next-byte-offset)))))))))
+
+(defn- try-parse-int-param
+  [nam value]
+  (try
+    (Integer/parseInt value)
+    (catch java.lang.NumberFormatException e
+      (->
+        (str "Could not parse " nam " to an integer")
+        (InvalidRequestException. e)
+        throw))))
+
+(defn search-log-file
+  [fname user ^String root-dir search num-matches offset callback origin]
+  (let [file (.getCanonicalFile (File. root-dir fname))]
+    (if (.exists file)
+      (if (or (blank? (*STORM-CONF* UI-FILTER))
+            (authorized-log-user? user fname *STORM-CONF*))
+        (let [num-matches-int (if num-matches
+                                (try-parse-int-param "num-matches"
+                                  num-matches))
+              offset-int (if offset
+                           (try-parse-int-param "start-byte-offset" offset))]
+          (try
+            (if (and (not (empty? search))
+                  <= (count (.getBytes search "UTF-8")) grep-max-search-size)
+              (json-response
+                (substring-search file
+                  search
+                  :num-matches num-matches-int
+                  :start-byte-offset offset-int)
+                callback
+                :headers {"Access-Control-Allow-Origin" origin
+                          "Access-Control-Allow-Credentials" "true"})
+              (throw
+                (InvalidRequestException.
+                  (str "Search substring must be between 1 and 1024 UTF-8 "
+                    "bytes in size (inclusive)"))))
+            (catch Exception ex
+              (json-response (exception->json ex) callback :status 500))))
+        (json-response (unauthorized-user-json user) callback :status 401))
+      (json-response {"error" "Not Found"
+                      "errorMessage" "The file was not found on this node."}
+        callback
+        :status 404))))
+
+(defn find-n-matches [logs n file-offset offset search]
+  (let [logs (drop file-offset logs)
+        wrap-matches-fn (fn [matches]
+                          {"fileOffset" file-offset
+                           "searchString" search
+                           "matches" matches})]
+    (loop [matches []
+           logs logs
+           offset offset
+           file-offset file-offset
+           match-count 0]
+      (if (empty? logs)
+        (wrap-matches-fn matches)
+        (let [these-matches (try
+                              (log-debug "Looking through " (first logs))
+                              (substring-search (first logs)
+                                search
+                                :num-matches (- n match-count)
+                                :start-byte-offset offset)
+                              (catch InvalidRequestException e
+                                (log-error e "Can't search past end of file.")
+                                {}))
+              file-name (get-topo-port-workerlog (first logs))
+              new-matches (conj matches
+                            (merge these-matches
+                              { "fileName" file-name
+                                "port" (first (take-last 2 (split 
(.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
+              new-count (+ match-count (count (these-matches "matches")))]
+          (if (empty? these-matches)
+            (recur matches (rest logs) 0 (+ file-offset 1) match-count)
+            (if (>= new-count n)
+              (wrap-matches-fn new-matches)
+              (recur new-matches (rest logs) 0 (+ file-offset 1) 
new-count))))))))
+
+(defn logs-for-port
+  "Get the filtered, authorized, sorted log files for a port."
+  [user port-dir]
+  (let [filter-authorized-fn (fn [user logs]
+                               (filter #(or
+                                          (blank? (*STORM-CONF* UI-FILTER))
+                                          (authorized-log-user? user 
(get-topo-port-workerlog %) *STORM-CONF*)) logs))]
+    (sort #(compare (.lastModified %2) (.lastModified %1))
+      (filter-authorized-fn
+        user
+        (filter #(re-find worker-log-filename-pattern (.getName %)) 
(DirectoryCleaner/getFilesForDir port-dir))))))
+
+(defn deep-search-logs-for-topology
+  [topology-id user ^String root-dir search num-matches port file-offset 
offset search-archived? callback origin]
+  (json-response
+    (if (or (not search) (not (.exists (File. (str root-dir 
file-path-separator topology-id)))))
+      []
+      (let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
+            offset (if offset (Integer/parseInt offset) 0)
+            num-matches (or (Integer/parseInt num-matches) 1)
+            port-dirs (vec (.listFiles (File. (str root-dir 
file-path-separator topology-id))))
+            logs-for-port-fn (partial logs-for-port user)]
+        (if (or (not port) (= "*" port))
+          ;; Check for all ports
+          (let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn 
port-dirs))]
+            (if search-archived?
+              (map #(find-n-matches % num-matches 0 0 search)
+                filtered-logs)
+              (map #(find-n-matches % num-matches 0 0 search)
+                (map (comp vector first) filtered-logs))))
+          ;; Check just the one port
+          (if (not (contains? (into #{} (map str (*STORM-CONF* 
SUPERVISOR-SLOTS-PORTS))) port))
+            []
+            (let [port-dir (File. (str root-dir file-path-separator 
topology-id file-path-separator port))]
+              (if (or (not (.exists port-dir)) (empty? (logs-for-port user 
port-dir)))
+                []
+                (let [filtered-logs (logs-for-port user port-dir)]
+                  (if search-archived?
+                    (find-n-matches filtered-logs num-matches file-offset 
offset search)
+                    (find-n-matches [(first filtered-logs)] num-matches 0 
offset search)))))))))
+    callback
+    :headers {"Access-Control-Allow-Origin" origin
+              "Access-Control-Allow-Credentials" "true"}))
+
+(defn log-template
+  ([body] (log-template body nil nil))
+  ([body fname user]
+    (html4
+     [:head
+      [:title (str (escape-html fname) " - Storm Log Viewer")]
+      (include-css "/css/bootstrap-3.3.1.min.css")
+      (include-css "/css/jquery.dataTables.1.10.4.min.css")
+      (include-css "/css/style.css")
+      ]
+     [:body
+      (concat
+        (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
+        [[:div.ui-note [:p "Note: the drop-list shows at most 1024 files for 
each worker directory."]]]
+        [[:h3 (escape-html fname)]]
+        (seq body))
+      ])))
+
+(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
+
+(defn- parse-long-from-map [m k]
+  (try
+    (Long/parseLong (k m))
+    (catch NumberFormatException ex
+      (throw (InvalidRequestException.
+               (str "Could not make an integer out of the query parameter '"
+                    (name k) "'")
+               ex)))))
+
+(defn list-log-files
+  [user topoId port log-root callback origin]
+  (let [file-results
+        (if (nil? topoId)
+          (if (nil? port)
+            (get-all-logs-for-rootdir (File. log-root))
+            (reduce concat
+              (for [topo-dir (.listFiles (File. log-root))]
+                (reduce concat
+                  (for [port-dir (.listFiles topo-dir)]
+                    (if (= (str port) (.getName port-dir))
+                      (into [] (DirectoryCleaner/getFilesForDir 
port-dir))))))))
+          (if (nil? port)
+            (let [topo-dir (File. (str log-root file-path-separator topoId))]
+              (if (.exists topo-dir)
+                (reduce concat
+                  (for [port-dir (.listFiles topo-dir)]
+                    (into [] (DirectoryCleaner/getFilesForDir port-dir))))
+                []))
+            (let [port-dir (get-worker-dir-from-root log-root topoId port)]
+              (if (.exists port-dir)
+                (into [] (DirectoryCleaner/getFilesForDir port-dir))
+                []))))
+        file-strs (sort (for [file file-results]
+                          (get-topo-port-workerlog file)))]
+    (json-response file-strs
+      callback
+      :headers {"Access-Control-Allow-Origin" origin
+                "Access-Control-Allow-Credentials" "true"})))
+
+(defn get-profiler-dump-files
+  [dir]
+  (filter (comp not nil?)
+        (for [f (DirectoryCleaner/getFilesForDir dir)]
+          (let [name (.getName f)]
+            (if (or
+                  (.endsWith name ".txt")
+                  (.endsWith name ".jfr")
+                  (.endsWith name ".bin"))
+              (.getName f))))))
+
+(defroutes log-routes
+  (GET "/log" [:as req & m]
+    (try
+      (mark! logviewer:num-log-page-http-requests)
+      (let [servlet-request (:servlet-request req)
+            log-root (:log-root req)
+            user (.getUserName http-creds-handler servlet-request)
+            start (if (:start m) (parse-long-from-map m :start))
+            length (if (:length m) (parse-long-from-map m :length))
+            file (url-decode (:file m))]
+        (log-template (log-page file start length (:grep m) user log-root)
+          file user))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
+  (GET "/dumps/:topo-id/:host-port/:filename"
+       [:as {:keys [servlet-request servlet-response log-root]} topo-id 
host-port filename &m]
+     (let [user (.getUserName http-creds-handler servlet-request)
+           port (second (split host-port #":"))
+           dir (File. (str log-root
+                           file-path-separator
+                           topo-id
+                           file-path-separator
+                           port))
+           file (File. (str log-root
+                            file-path-separator
+                            topo-id
+                            file-path-separator
+                            port
+                            file-path-separator
+                            filename))]
+       (if (and (.exists dir) (.exists file))
+         (if (or (blank? (*STORM-CONF* UI-FILTER))
+               (authorized-log-user? user 
+                                     (str topo-id file-path-separator port 
file-path-separator "worker.log")
+                                     *STORM-CONF*))
+           (-> (resp/response file)
+               (resp/content-type "application/octet-stream"))
+           (unauthorized-user-html user))
+         (-> (resp/response "Page not found")
+           (resp/status 404)))))
+  (GET "/dumps/:topo-id/:host-port"
+       [:as {:keys [servlet-request servlet-response log-root]} topo-id 
host-port &m]
+     (let [user (.getUserName http-creds-handler servlet-request)
+           port (second (split host-port #":"))
+           dir (File. (str log-root
+                           file-path-separator
+                           topo-id
+                           file-path-separator
+                           port))]
+       (if (.exists dir)
+         (if (or (blank? (*STORM-CONF* UI-FILTER))
+               (authorized-log-user? user 
+                                     (str topo-id file-path-separator port 
file-path-separator "worker.log")
+                                     *STORM-CONF*))
+           (html4
+             [:head
+              [:title "File Dumps - Storm Log Viewer"]
+              (include-css "/css/bootstrap-3.3.1.min.css")
+              (include-css "/css/jquery.dataTables.1.10.4.min.css")
+              (include-css "/css/style.css")]
+             [:body
+              [:ul
+               (for [file (get-profiler-dump-files dir)]
+                 [:li
+                  [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} 
file ]])]])
+           (unauthorized-user-html user))
+         (-> (resp/response "Page not found")
+           (resp/status 404)))))
+  (GET "/daemonlog" [:as req & m]
+    (try
+      (mark! logviewer:num-daemonlog-page-http-requests)
+      (let [servlet-request (:servlet-request req)
+            daemonlog-root (:daemonlog-root req)
+            user (.getUserName http-creds-handler servlet-request)
+            start (if (:start m) (parse-long-from-map m :start))
+            length (if (:length m) (parse-long-from-map m :length))
+            file (url-decode (:file m))]
+        (log-template (daemonlog-page file start length (:grep m) user 
daemonlog-root)
+          file user))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
+  (GET "/download/:file" [:as {:keys [servlet-request servlet-response 
log-root]} file & m]
+    (try
+      (mark! logviewer:num-download-log-file-http-requests)
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (download-log-file file servlet-request servlet-response user 
log-root))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
+  (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response 
daemonlog-root]} file & m]
+    (try
+      (mark! logviewer:num-download-log-daemon-file-http-requests)
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (download-log-file file servlet-request servlet-response user 
daemonlog-root))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
+  (GET "/search/:file" [:as {:keys [servlet-request servlet-response 
log-root]} file & m]
+    ;; We do not use servlet-response here, but do not remove it from the
+    ;; :keys list, or this rule could stop working when an authentication
+    ;; filter is configured.
+    (try
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (search-log-file (url-decode file)
+          user
+          log-root
+          (:search-string m)
+          (:num-matches m)
+          (:start-byte-offset m)
+          (:callback m)
+          (.getHeader servlet-request "Origin")))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (json-response (exception->json ex) (:callback m) :status 400))))
+  (GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response 
log-root]} topo-id & m]
+    ;; We do not use servlet-response here, but do not remove it from the
+    ;; :keys list, or this rule could stop working when an authentication
+    ;; filter is configured.
+    (try
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (deep-search-logs-for-topology topo-id
+          user
+          log-root
+          (:search-string m)
+          (:num-matches m)
+          (:port m)
+          (:start-file-offset m)
+          (:start-byte-offset m)
+          (:search-archived m)
+          (:callback m)
+          (.getHeader servlet-request "Origin")))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (json-response (exception->json ex) (:callback m) :status 400))))
+  (GET "/searchLogs" [:as req & m]
+    (try
+      (let [servlet-request (:servlet-request req)
+            user (.getUserName http-creds-handler servlet-request)]
+        (list-log-files user
+          (:topoId m)
+          (:port m)
+          (:log-root req)
+          (:callback m)
+          (.getHeader servlet-request "Origin")))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (json-response (exception->json ex) (:callback m) :status 400))))
+  (GET "/listLogs" [:as req & m]
+    (try
+      (mark! logviewer:num-list-logs-http-requests)
+      (let [servlet-request (:servlet-request req)
+            user (.getUserName http-creds-handler servlet-request)]
+        (list-log-files user
+          (:topoId m)
+          (:port m)
+          (:log-root req)
+          (:callback m)
+          (.getHeader servlet-request "Origin")))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (json-response (exception->json ex) (:callback m) :status 400))))
+  (route/resources "/")
+  (route/not-found "Page not found"))
+
+(defn conf-middleware
+  "For passing the storm configuration with each request."
+  [app log-root daemonlog-root]
+  (fn [req]
+    (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
+
+(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
+  (try
+    (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
+          filter-class (conf UI-FILTER)
+          filter-params (conf UI-FILTER-PARAMS)
+          logapp (handler/api (-> log-routes
+                                requests-middleware))  ;; query params as map
+          middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
+          filters-confs (if (conf UI-FILTER)
+                          [{:filter-class filter-class
+                            :filter-params (or (conf UI-FILTER-PARAMS) {})}]
+                          [])
+          filters-confs (concat filters-confs
+                          [{:filter-class 
"org.eclipse.jetty.servlets.GzipFilter"
+                            :filter-name "Gzipper"
+                            :filter-params {}}])
+          https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
+          keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
+          keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
+          keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
+          key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
+          truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
+          truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
+          truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
+          want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
+          need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
+      (storm-run-jetty {:port (int (conf LOGVIEWER-PORT))
+                        :configurator (fn [server]
+                                        (config-ssl server
+                                                    https-port
+                                                    keystore-path
+                                                    keystore-pass
+                                                    keystore-type
+                                                    key-password
+                                                    truststore-path
+                                                    truststore-password
+                                                    truststore-type
+                                                    want-client-auth
+                                                    need-client-auth)
+                                        (config-filter server middle 
filters-confs))}))
+  (catch Exception ex
+    (log-error ex))))
+
+(defn -main []
+  (let [conf (read-storm-config)
+        log-root (worker-artifacts-root conf)
+        daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
+    (setup-default-uncaught-exception-handler)
+    (start-log-cleaner! conf log-root)
+    (log-message "Starting logviewer server for storm version '"
+                 STORM-VERSION
+                 "'")
+    (start-logviewer! conf log-root daemonlog-root)
+    (start-metrics-reporters)))

Reply via email to