STORM-1280 port backtype.storm.daemon.logviewer to java * ported logviewer-test.clj * TODO: some tests are failing, looking into it
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44b268ba Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44b268ba Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44b268ba Branch: refs/heads/master Commit: 44b268badb7ee1a92b1cc8abf81dbb640fa9af19 Parents: 11a7905 Author: Jungtaek Lim <[email protected]> Authored: Thu Jul 13 00:27:38 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Jul 14 12:11:41 2017 +0900 ---------------------------------------------------------------------- bin/storm.py | 2 +- .../clj/org/apache/storm/daemon/logviewer.clj | 1254 ------------------ .../dev/logviewer-search-context-tests.log.gz | Bin 72 -> 0 bytes .../dev/logviewer-search-context-tests.log.test | 1 - storm-core/src/dev/small-worker.log.test | 1 - storm-core/src/dev/test-3072.log.test | 3 - storm-core/src/dev/test-worker.log.test | 380 ------ .../apache/storm/daemon/DirectoryCleaner.java | 188 --- .../clj/org/apache/storm/logviewer_test.clj | 824 ------------ .../daemon/logviewer/LogviewerConstant.java | 23 + .../storm/daemon/logviewer/LogviewerServer.java | 172 +++ .../handler/LogviewerLogDownloadHandler.java | 43 + .../handler/LogviewerLogPageHandler.java | 410 ++++++ .../handler/LogviewerLogSearchHandler.java | 707 ++++++++++ .../handler/LogviewerProfileHandler.java | 115 ++ .../logviewer/utils/DirectoryCleaner.java | 183 +++ .../daemon/logviewer/utils/LogCleaner.java | 305 +++++ .../logviewer/utils/LogFileDownloader.java | 51 + .../utils/LogviewerResponseBuilder.java | 118 ++ .../logviewer/utils/ResourceAuthorizer.java | 130 ++ .../daemon/logviewer/utils/WorkerLogs.java | 60 + .../logviewer/webapp/LogviewerApplication.java | 94 ++ .../logviewer/webapp/LogviewerResource.java | 221 +++ .../daemon/wip/logviewer/LogviewerConstant.java | 23 - .../daemon/wip/logviewer/LogviewerServer.java | 174 --- .../handler/LogviewerLogDownloadHandler.java | 43 - .../handler/LogviewerLogPageHandler.java | 412 ------ .../handler/LogviewerLogSearchHandler.java | 686 ---------- .../handler/LogviewerProfileHandler.java | 115 -- .../daemon/wip/logviewer/utils/LogCleaner.java | 296 ----- .../wip/logviewer/utils/LogFileDownloader.java | 51 - .../utils/LogviewerResponseBuilder.java | 118 -- .../wip/logviewer/utils/ResourceAuthorizer.java | 129 -- .../daemon/wip/logviewer/utils/WorkerLogs.java | 63 - .../logviewer/webapp/LogviewerApplication.java | 94 -- .../wip/logviewer/webapp/LogviewerResource.java | 221 --- .../storm/daemon/logviewer/LogviewerTest.java | 51 + .../handler/LogviewerLogPageHandlerTest.java | 101 ++ .../handler/LogviewerLogSearchHandlerTest.java | 854 ++++++++++++ .../testsupport/ArgumentsVerifier.java | 34 + .../testsupport/MockDirectoryBuilder.java | 66 + .../logviewer/testsupport/MockFileBuilder.java | 66 + .../daemon/logviewer/utils/LogCleanerTest.java | 376 ++++++ .../logviewer/utils/ResourceAuthorizerTest.java | 182 +++ .../logviewer-search-context-tests.log.gz | Bin 0 -> 72 bytes .../logviewer-search-context-tests.log.test | 1 + .../src/test/resources/small-worker.log.test | 1 + .../src/test/resources/test-3072.log.test | 3 + .../src/test/resources/test-worker.log.test | 380 ++++++ 49 files changed, 4748 insertions(+), 5077 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index dade6b5..a3c6506 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -812,7 +812,7 @@ def logviewer(): allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR) allextrajars.append(CLUSTER_CONF_DIR) exec_storm_class( - "org.apache.storm.daemon.wip.logviewer.LogviewerServer", + "org.apache.storm.daemon.logviewer.LogviewerServer", jvmtype="-server", daemonName="logviewer", jvmopts=jvmopts, http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj deleted file mode 100644 index 27b4ba1..0000000 --- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj +++ /dev/null @@ -1,1254 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.daemon.logviewer - (:use compojure.core) - (:use [clojure.set :only [difference intersection]]) - (:use [clojure.string :only [blank? split]]) - (:use [hiccup core page-helpers form-helpers]) - (:use [org.apache.storm config daemon-config util log]) - (:use [org.apache.storm.ui helpers]) - (:import [org.apache.storm StormTimer] - [org.apache.storm.daemon.supervisor ClientSupervisorUtils] - [org.apache.storm.daemon.supervisor SupervisorUtils] - [org.apache.storm.metric StormMetricsRegistry]) - (:import [org.apache.storm.utils Time VersionInfo ConfigUtils Utils ServerUtils ServerConfigUtils]) - (:import [java.util Arrays ArrayList HashSet]) - (:import [java.util.zip GZIPInputStream]) - (:import [org.apache.logging.log4j LogManager]) - (:import [org.apache.logging.log4j.core.appender RollingFileAppender]) - (:import [java.io BufferedInputStream File FileFilter FileInputStream - InputStream] - [java.net URLDecoder]) - (:import [java.nio.file Files DirectoryStream]) - (:import [java.nio ByteBuffer]) - (:import [org.apache.storm.daemon DirectoryCleaner]) - (:import [org.apache.storm.ui InvalidRequestException UIHelpers IConfigurator FilterConfiguration] - [org.apache.storm.security.auth AuthUtils]) - (:require [compojure.route :as route] - [compojure.handler :as handler] - [ring.middleware.keyword-params] - [ring.util.codec :as codec] - [ring.util.response :as resp] - [clojure.string :as string]) - (:gen-class)) - -(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig))) -(def STORM-VERSION (VersionInfo/getVersion)) - -(def worker-log-filename-pattern #"^worker.log(.*)") - -(def logviewer:num-log-page-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-log-page-http-requests")) -(def logviewer:num-daemonlog-page-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-daemonlog-page-http-requests")) -(def logviewer:num-download-log-file-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-download-log-file-http-requests")) -(def logviewer:num-download-log-daemon-file-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-download-log-daemon-file-http-requests")) -(def logviewer:num-list-logs-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-list-logs-http-requests")) - -(defn cleanup-cutoff-age-millis [conf now-millis] - (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000))) - -(defn get-stream-for-dir - [^File f] - (try (Files/newDirectoryStream (.toPath f)) - (catch Exception ex (log-error ex) nil))) - -(defn- last-modifiedtime-worker-logdir - "Return the last modified time for all log files in a worker's log dir. - Using stream rather than File.listFiles is to avoid large mem usage - when a directory has too many files" - [^File log-dir] - (let [^DirectoryStream stream (get-stream-for-dir log-dir) - dir-modified (.lastModified log-dir) - last-modified (try (reduce - (fn [maximum path] - (let [curr (.lastModified (.toFile path))] - (if (> curr maximum) - curr - maximum))) - dir-modified - stream) - (catch Exception ex - (log-error ex) dir-modified) - (finally - (if (instance? DirectoryStream stream) - (.close stream))))] - last-modified)) - -(defn get-size-for-logdir - "Return the sum of lengths for all log files in a worker's log dir. - Using stream rather than File.listFiles is to avoid large mem usage - when a directory has too many files" - [log-dir] - (let [^DirectoryStream stream (get-stream-for-dir log-dir)] - (reduce - (fn [sum path] - (let [size (.length (.toFile path))] - (+ sum size))) - 0 - stream))) - -(defn mk-FileFilter-for-log-cleanup [conf now-millis] - (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)] - (reify FileFilter (^boolean accept [this ^File file] - (boolean (and - (not (.isFile file)) - (<= (last-modifiedtime-worker-logdir file) cutoff-age-millis))))))) - -(defn select-dirs-for-cleanup [conf now-millis root-dir] - (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)] - (reduce clojure.set/union - (sorted-set) - (for [^File topo-dir (.listFiles (File. root-dir))] - (into [] (.listFiles topo-dir file-filter)))))) - -(defn get-topo-port-workerlog - "Return the path of the worker log with the format of topoId/port/worker.log.*" - [^File file] - (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR - (take-last 3 - (split (.getCanonicalPath file) (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))) - -(defn get-metadata-file-for-log-root-name [root-name root-dir] - (let [metaFile (clojure.java.io/file root-dir "metadata" - (str root-name ".yaml"))] - (if (.exists metaFile) - metaFile - (do - (log-warn "Could not find " (.getCanonicalPath metaFile) - " to clean up for " root-name) - nil)))) - -(defn get-metadata-file-for-wroker-logdir [logdir] - (let [metaFile (clojure.java.io/file logdir "worker.yaml")] - (if (.exists metaFile) - metaFile - (do - (log-warn "Could not find " (.getCanonicalPath metaFile) - " to clean up for " logdir) - nil)))) - -(defn get-worker-id-from-metadata-file [metaFile] - (get (clojurify-structure (Utils/readYamlFile metaFile)) "worker-id")) - -(defn get-topo-owner-from-metadata-file [metaFile] - (get (clojurify-structure (Utils/readYamlFile metaFile)) TOPOLOGY-SUBMITTER-USER)) - -(defn identify-worker-log-dirs [log-dirs] - "return the workerid to worker-log-dir map" - (into {} (for [logdir log-dirs - :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]] - (if metaFile - {(get-worker-id-from-metadata-file metaFile) logdir} - {"" logdir})))) ;; an old directory that has no yaml file will be treated as a dead dir for deleting - -(defn get-alive-ids - [conf now-secs] - (->> - (clojurify-structure (SupervisorUtils/readWorkerHeartbeats conf)) - (remove - #(or (not (val %)) - (SupervisorUtils/isWorkerHbTimedOut now-secs - (val %) - conf))) - keys - set)) - -(defn get-dead-worker-dirs - "Return a sorted set of java.io.Files that were written by workers that are - now dead" - [conf now-secs log-dirs] - (if (empty? log-dirs) - (sorted-set) - (let [alive-ids (get-alive-ids conf now-secs) - id->dir (identify-worker-log-dirs log-dirs)] - (apply sorted-set - (for [[id dir] id->dir - :when (not (contains? alive-ids id))] - dir))))) - -(defn get-all-worker-dirs [^File root-dir] - (reduce clojure.set/union - (sorted-set) - (for [^File topo-dir (.listFiles root-dir)] - (into [] (.listFiles topo-dir))))) - -(defn get-alive-worker-dirs - "Return a sorted set of java.io.Files that were written by workers that are - now active" - [conf root-dir] - (let [alive-ids (get-alive-ids conf (Time/currentTimeSecs)) - log-dirs (get-all-worker-dirs root-dir) - id->dir (identify-worker-log-dirs log-dirs)] - (apply sorted-set - (for [[id dir] id->dir - :when (contains? alive-ids id)] - (.getCanonicalPath dir))))) - -(defn get-all-logs-for-rootdir [^File log-dir] - (reduce concat - (for [port-dir (get-all-worker-dirs log-dir)] - (into [] (DirectoryCleaner/getFilesForDir port-dir))))) - -(defn is-active-log [^File file] - (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file))) - -(defn sum-file-size - "Given a sequence of Files, sum their sizes." - [files] - (reduce #(+ %1 (.length %2)) 0 files)) - -(defn per-workerdir-cleanup! - "Delete the oldest files in each overloaded worker log dir" - [^File root-dir size ^DirectoryCleaner cleaner] - (dofor [worker-dir (get-all-worker-dirs root-dir)] - (.deleteOldestWhileTooLarge cleaner (ArrayList. [worker-dir]) size true nil))) - -(defn global-log-cleanup! - "Delete the oldest files in overloaded worker-artifacts globally" - [^File root-dir size ^DirectoryCleaner cleaner] - (let [worker-dirs (ArrayList. (get-all-worker-dirs root-dir)) - alive-worker-dirs (HashSet. (get-alive-worker-dirs *STORM-CONF* root-dir))] - (.deleteOldestWhileTooLarge cleaner worker-dirs size false alive-worker-dirs))) - -(defn cleanup-empty-topodir! - "Delete the topo dir if it contains zero port dirs" - [^File dir] - (let [topodir (.getParentFile dir)] - (if (empty? (.listFiles topodir)) - (Utils/forceDelete (.getCanonicalPath topodir))))) - -(defn cleanup-fn! - "Delete old log dirs for which the workers are no longer alive" - [log-root-dir] - (let [now-secs (Time/currentTimeSecs) - old-log-dirs (select-dirs-for-cleanup *STORM-CONF* - (* now-secs 1000) - log-root-dir) - total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB) - per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB) - per-dir-size (min per-dir-size (* total-size 0.5)) - cleaner (DirectoryCleaner.) - dead-worker-dirs (get-dead-worker-dirs *STORM-CONF* - now-secs - old-log-dirs)] - (log-debug "log cleanup: now=" now-secs - " old log dirs " (pr-str (map #(.getName %) old-log-dirs)) - " dead worker dirs " (pr-str - (map #(.getName %) dead-worker-dirs))) - (dofor [dir dead-worker-dirs] - (let [path (.getCanonicalPath dir)] - (log-message "Cleaning up: Removing " path) - (try (Utils/forceDelete path) - (cleanup-empty-topodir! dir) - (catch Exception ex (log-error ex))))) - (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 1024)) cleaner) - (let [size (* total-size (* 1024 1024))] - (global-log-cleanup! (File. log-root-dir) size cleaner)))) - -(defn start-log-cleaner! [conf log-root-dir] - (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] - (when interval-secs - (log-debug "starting log cleanup thread at interval: " interval-secs) - - (let [timer (StormTimer. "logviewer-cleanup" - (reify Thread$UncaughtExceptionHandler - (^void uncaughtException - [this ^Thread t ^Throwable e] - (log-error t "Error when doing logs cleanup") - (Utils/exitProcess 20 "Error when doing log cleanup"))))] - (.scheduleRecurring timer 0 interval-secs - (fn [] (cleanup-fn! log-root-dir))))))) - -(defn- skip-bytes - "FileInputStream#skip may not work the first time, so ensure it successfully - skips the given number of bytes." - [^InputStream stream n] - (loop [skipped 0] - (let [skipped (+ skipped (.skip stream (- n skipped)))] - (if (< skipped n) (recur skipped))))) - -(defn logfile-matches-filter? - [log-file-name] - (let [regex-string (str "worker.log.*") - regex-pattern (re-pattern regex-string)] - (not= (re-seq regex-pattern (.toString log-file-name)) nil))) - -(defn page-file - ([path tail] - (let [zip-file? (.endsWith path ".gz") - flen (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path))) - skip (- flen tail)] - (page-file path skip tail))) - ([path start length] - (let [zip-file? (.endsWith path ".gz") - flen (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))] - (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path)) - output (java.io.ByteArrayOutputStream.)] - (if (>= start flen) - (throw - (InvalidRequestException. "Cannot start past the end of the file"))) - (if (> start 0) (skip-bytes input start)) - (let [buffer (make-array Byte/TYPE 1024)] - (loop [] - (when (< (.size output) length) - (let [size (.read input buffer 0 (min 1024 (- length (.size output))))] - (when (pos? size) - (.write output buffer 0 size) - (recur))))) - (.toString output)))))) - -(defn get-log-user-group-whitelist [fname] - (let [wl-file (ServerConfigUtils/getLogMetaDataFile fname) - m (clojurify-structure (Utils/readYamlFile wl-file))] - (if (not-nil? m) - (do - (let [user-wl (.get m LOGS-USERS) - user-wl (if user-wl user-wl []) - group-wl (.get m LOGS-GROUPS) - group-wl (if group-wl group-wl [])] - [user-wl group-wl])) - nil))) - -(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*)) -(defn user-groups - [user] - (if (blank? user) [] (.getGroups igroup-mapper user))) - -(defn authorized-log-user? [user fname conf] - (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname))) - nil - (let [groups (user-groups user) - [user-wl group-wl] (get-log-user-group-whitelist fname) - logs-users (concat (conf LOGS-USERS) - (conf NIMBUS-ADMINS) - user-wl) - logs-groups (concat (conf LOGS-GROUPS) - group-wl)] - (or (some #(= % user) logs-users) - (< 0 (.size (intersection (set groups) (set logs-groups)))))))) - -(defn log-root-dir - "Given an appender name, as configured, get the parent directory of the appender's log file. - Note that if anything goes wrong, this will throw an Error and exit." - [appender-name] - (let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)] - (if (and appender-name appender (instance? RollingFileAppender appender)) - (.getParent (File. (.getFileName appender))) - (throw - (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree."))))) - -(defnk to-btn-link - "Create a link that is formatted like a button" - [url text :enabled true] - [:a {:href (java.net.URI. url) - :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text]) - -(defn search-file-form [fname is-daemon] - [[:form {:action "logviewer_search.html" :id "search-box"} - "Search this file:" - [:input {:type "text" :name "search"}] - [:input {:type "hidden" :name "is-daemon" :value is-daemon}] - [:input {:type "hidden" :name "file" :value fname}] - [:input {:type "submit" :value "Search"}]]]) - -(defn log-file-selection-form [log-files type] - [[:form {:action type :id "list-of-files"} - (drop-down "file" log-files) - [:input {:type "submit" :value "Switch file"}]]]) - -(defn pager-links [fname start length file-size type] - (let [prev-start (max 0 (- start length)) - next-start (if (> file-size 0) - (min (max 0 (- file-size length)) (+ start length)) - (+ start length))] - [[:div - (concat - [(to-btn-link (url (str "/" type) - {:file fname - :start (max 0 (- start length)) - :length length}) - "Prev" :enabled (< prev-start start))] - [(to-btn-link (url (str "/" type) - {:file fname - :start 0 - :length length}) "First")] - [(to-btn-link (url (str "/" type) - {:file fname - :length length}) - "Last")] - [(to-btn-link (url (str "/" type) - {:file fname - :start (min (max 0 (- file-size length)) - (+ start length)) - :length length}) - "Next" :enabled (> next-start start))])]])) - -(defn- download-link [fname] - [[:p (link-to (UIHelpers/urlFormat "/download?file=%s" (to-array [fname])) "Download Full File")]]) - -(defn- daemon-download-link [fname] - [[:p (link-to (UIHelpers/urlFormat "/daemondownload/%s" (to-array [fname])) "Download Full File")]]) - -(defn- is-txt-file [fname] - (re-find #"\.(log.*|txt|yaml|pid)$" fname)) - -(defn unauthorized-user-html [user] - [[:h2 "User '" (escape-html user) "' is not authorized."]]) - -(defn ring-response-from-exception [ex] - {:headers {} - :status 400 - :body (.getMessage ex)}) - -(def default-bytes-per-page 51200) - -(defn log-page [fname start length grep user root-dir] - (if (or (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user fname *STORM-CONF*)) - (let [file (.getCanonicalFile (File. root-dir fname)) - path (.getCanonicalPath file) - zip-file? (.endsWith path ".gz") - topo-dir (.getParentFile (.getParentFile file))] - (if (and (.exists file) - (= (.getCanonicalFile (File. root-dir)) - (.getParentFile topo-dir))) - (let [file-length (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path))) - log-files (reduce clojure.set/union - (sorted-set) - (for [^File port-dir (.listFiles topo-dir)] - (into [] (filter #(.isFile %) (DirectoryCleaner/getFilesForDir port-dir))))) ;all types of files included - files-str (for [file log-files] - (get-topo-port-workerlog file)) - reordered-files-str (conj (filter #(not= fname %) files-str) fname) - length (if length - (min 10485760 length) - default-bytes-per-page) - log-string (escape-html - (if (is-txt-file fname) - (if start - (page-file path start length) - (page-file path length)) - "This is a binary file and cannot display! You may download the full file.")) - start (or start (- file-length length))] - (if grep - (html [:pre#logContent - (if grep - (->> (.split log-string "\n") - (filter #(.contains % grep)) - (string/join "\n")) - log-string)]) - (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length "log") nil)] - (html (concat (search-file-form fname "no") - (log-file-selection-form reordered-files-str "log") ; list all files for this topology - pager-data - (download-link fname) - [[:pre#logContent log-string]] - pager-data))))) - (-> (resp/response "Page not found") - (resp/status 404)))) - (if (nil? (get-log-user-group-whitelist fname)) - (-> (resp/response "Page not found") - (resp/status 404)) - (unauthorized-user-html user)))) - -(defn daemonlog-page [fname start length grep user root-dir] - (let [file (.getCanonicalFile (File. root-dir fname)) - file-length (.length file) - path (.getCanonicalPath file) - zip-file? (.endsWith path ".gz")] - (if (and (= (.getCanonicalFile (File. root-dir)) - (.getParentFile file)) - (.exists file)) - (let [file-length (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path))) - length (if length - (min 10485760 length) - default-bytes-per-page) - log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included - files-str (for [file log-files] - (.getName file)) - reordered-files-str (conj (filter #(not= fname %) files-str) fname) - log-string (escape-html - (if (is-txt-file fname) - (if start - (page-file path start length) - (page-file path length)) - "This is a binary file and cannot display! You may download the full file.")) - start (or start (- file-length length))] - (if grep - (html [:pre#logContent - (if grep - (->> (.split log-string "\n") - (filter #(.contains % grep)) - (string/join "\n")) - log-string)]) - (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length "daemonlog") nil)] - (html (concat (search-file-form fname "yes") - (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs - pager-data - (daemon-download-link fname) - [[:pre#logContent log-string]] - pager-data))))) - (-> (resp/response "Page not found") - (resp/status 404))))) - -(defnk set-log-file-permissions [fname root-dir] - (let [file (.getCanonicalFile (File. root-dir fname)) - run-as-user (*STORM-CONF* SUPERVISOR-RUN-WORKER-AS-USER) - parent (.getParent (File. root-dir fname)) - md-file (if (nil? parent) nil (get-metadata-file-for-wroker-logdir parent)) - topo-owner (if (nil? md-file) nil (get-topo-owner-from-metadata-file (.getCanonicalPath md-file)))] - (when (and run-as-user - (not-nil? topo-owner) - (.exists file) - (not (Files/isReadable (.toPath file)))) - (log-debug "Setting permissions on file " fname " with topo-owner " topo-owner) - (ClientSupervisorUtils/processLauncherAndWait *STORM-CONF* topo-owner ["blob" (.getCanonicalPath file)] nil (str "setup group read permissions for file: " fname))))) - -(defnk download-log-file [fname req resp user ^String root-dir :is-daemon false] - (let [file (.getCanonicalFile (File. root-dir fname))] - (if (.exists file) - - (if (or is-daemon - (or (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user fname *STORM-CONF*))) - (-> (resp/response file) - (resp/content-type "application/octet-stream")) - (unauthorized-user-html user)) - (-> (resp/response "Page not found") - (resp/status 404))))) - -(def grep-max-search-size 1024) -(def grep-buf-size 2048) -(def grep-context-size 128) - -(defn logviewer-port - [] - (int (*STORM-CONF* LOGVIEWER-PORT))) - -(defn url-to-match-centered-in-log-page - [needle fname offset port] - (let [host (Utils/hostname) - port (logviewer-port) - fname (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR (take-last 3 (split fname (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))] - (url (str "http://" host ":" port "/log") - {:file fname - :start (max 0 - (- offset - (int (/ default-bytes-per-page 2)) - (int (/ (alength needle) -2)))) ;; Addition - :length default-bytes-per-page}))) - -(defn url-to-match-centered-in-log-page-daemon-file - [needle fname offset port] - (let [host (Utils/hostname) - port (logviewer-port) - fname (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR (take-last 1 (split fname (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))] - (url (str "http://" host ":" port "/daemonlog") - {:file fname - :start (max 0 - (- offset - (int (/ default-bytes-per-page 2)) - (int (/ (alength needle) -2)))) ;; Addition - :length default-bytes-per-page}))) - -(defnk mk-match-data - [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname - :is-daemon false :before-bytes nil :after-bytes nil] - (let [url (if is-daemon - (url-to-match-centered-in-log-page-daemon-file needle - fname - file-offset - (*STORM-CONF* LOGVIEWER-PORT)) - (url-to-match-centered-in-log-page needle - fname - file-offset - (*STORM-CONF* LOGVIEWER-PORT))) - haystack-bytes (.array haystack) - before-string (if (>= haystack-offset grep-context-size) - (String. haystack-bytes - (- haystack-offset grep-context-size) - grep-context-size - "UTF-8") - (let [num-desired (max 0 (- grep-context-size - haystack-offset)) - before-size (if before-bytes - (alength before-bytes) - 0) - num-expected (min before-size num-desired)] - (if (pos? num-expected) - (str (String. before-bytes - (- before-size num-expected) - num-expected - "UTF-8") - (String. haystack-bytes - 0 - haystack-offset - "UTF-8")) - (String. haystack-bytes - 0 - haystack-offset - "UTF-8")))) - after-string (let [needle-size (alength needle) - after-offset (+ haystack-offset needle-size) - haystack-size (.limit haystack)] - (if (< (+ after-offset grep-context-size) haystack-size) - (String. haystack-bytes - after-offset - grep-context-size - "UTF-8") - (let [num-desired (- grep-context-size - (- haystack-size after-offset)) - after-size (if after-bytes - (alength after-bytes) - 0) - num-expected (min after-size num-desired)] - (if (pos? num-expected) - (str (String. haystack-bytes - after-offset - (- haystack-size after-offset) - "UTF-8") - (String. after-bytes 0 num-expected "UTF-8")) - (String. haystack-bytes - after-offset - (- haystack-size after-offset) - "UTF-8")))))] - {"byteOffset" file-offset - "beforeString" before-string - "afterString" after-string - "matchString" (String. needle "UTF-8") - "logviewerURL" url})) - -(defn- try-read-ahead! - "Tries once to read ahead in the stream to fill the context and resets the - stream to its position before the call." - [^BufferedInputStream stream haystack offset file-len bytes-read] - (let [num-expected (min (- file-len bytes-read) - grep-context-size) - after-bytes (byte-array num-expected)] - (.mark stream num-expected) - ;; Only try reading once. - (.read stream after-bytes 0 num-expected) - (.reset stream) - after-bytes)) - -(defn offset-of-bytes - "Searches a given byte array for a match of a sub-array of bytes. Returns - the offset to the byte that matches, or -1 if no match was found." - [^bytes buf ^bytes value init-offset] - {:pre [(> (alength value) 0) - (not (neg? init-offset))]} - (loop [offset init-offset - candidate-offset init-offset - val-offset 0] - (if-not (pos? (- (alength value) val-offset)) - ;; Found - candidate-offset - (if (>= offset (alength buf)) - ;; We ran out of buffer for the search. - -1 - (if (not= (aget value val-offset) (aget buf offset)) - ;; The match at this candidate offset failed, so start over with the - ;; next candidate byte from the buffer. - (let [new-offset (inc candidate-offset)] - (recur new-offset new-offset 0)) - ;; So far it matches. Keep going... - (recur (inc offset) candidate-offset (inc val-offset))))))) - -(defn- buffer-substring-search! - "As the file is read into a buffer, 1/2 the buffer's size at a time, we - search the buffer for matches of the substring and return a list of zero or - more matches." - [is-daemon file file-len offset-to-buf init-buf-offset stream bytes-skipped - bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches - ^bytes before-bytes] - (loop [buf-offset init-buf-offset - matches initial-matches] - (let [offset (offset-of-bytes (.array haystack) needle buf-offset)] - (if (and (< (count matches) num-matches) (not (neg? offset))) - (let [file-offset (+ offset-to-buf offset) - bytes-needed-after-match (- (.limit haystack) - grep-context-size - (alength needle)) - before-arg (if (< offset grep-context-size) before-bytes) - after-arg (if (> offset bytes-needed-after-match) - (try-read-ahead! stream - haystack - offset - file-len - bytes-read))] - (recur (+ offset (alength needle)) - (conj matches - (mk-match-data needle - haystack - offset - file-offset - (.getCanonicalPath file) - :is-daemon is-daemon - :before-bytes before-arg - :after-bytes after-arg)))) - (let [before-str-to-offset (min (.limit haystack) - grep-max-search-size) - before-str-from-offset (max 0 (- before-str-to-offset - grep-context-size)) - new-before-bytes (Arrays/copyOfRange (.array haystack) - before-str-from-offset - before-str-to-offset) - ;; It's OK if new-byte-offset is negative. This is normal if - ;; we are out of bytes to read from a small file. - new-byte-offset (if (>= (count matches) num-matches) - (+ (get (last matches) "byteOffset") - (alength needle)) - (+ bytes-skipped - bytes-read - (- grep-max-search-size)))] - [matches new-byte-offset new-before-bytes]))))) - -(defn- mk-grep-response - "This response data only includes a next byte offset if there is more of the - file to read." - [search-bytes offset matches next-byte-offset] - (merge {"searchString" (String. search-bytes "UTF-8") - "startByteOffset" offset - "matches" matches} - (and next-byte-offset {"nextByteOffset" next-byte-offset}))) - -(defn rotate-grep-buffer! - [^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len] - (let [buf-arr (.array buf)] - ;; Copy the 2nd half of the buffer to the first half. - (System/arraycopy buf-arr - grep-max-search-size - buf-arr - 0 - grep-max-search-size) - ;; Zero-out the 2nd half to prevent accidental matches. - (Arrays/fill buf-arr - grep-max-search-size - (count buf-arr) - (byte 0)) - ;; Fill the 2nd half with new bytes from the stream. - (let [bytes-read (.read stream - buf-arr - grep-max-search-size - (min file-len grep-max-search-size))] - (.limit buf (+ grep-max-search-size bytes-read)) - (swap! total-bytes-read + bytes-read)))) - -(defnk substring-search - "Searches for a substring in a log file, starting at the given offset, - returning the given number of matches, surrounded by the given number of - context lines. Other information is included to be useful for progressively - searching through a file for display in a UI. The search string must - grep-max-search-size bytes or fewer when decoded with UTF-8." - [file ^String search-string :is-daemon false :num-matches 10 :start-byte-offset 0] - {:pre [(not (empty? search-string)) - (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]} - (let [zip-file? (.endsWith (.getName file) ".gz") - f-input-steam (FileInputStream. file) - gzipped-input-stream (if zip-file? - (GZIPInputStream. f-input-steam) - f-input-steam) - stream ^BufferedInputStream (BufferedInputStream. - gzipped-input-stream) - file-len (if zip-file? (ServerUtils/zipFileSize file) (.length file)) - buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size) - buf-arr ^bytes (.array buf) - string nil - total-bytes-read (atom 0) - matches [] - search-bytes ^bytes (.getBytes search-string "UTF-8") - num-matches (or num-matches 10) - start-byte-offset (or start-byte-offset 0)] - ;; Start at the part of the log file we are interested in. - ;; Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files - (if (> start-byte-offset file-len) - (throw - (InvalidRequestException. "Cannot search past the end of the file"))) - (when (> start-byte-offset 0) - (skip-bytes stream start-byte-offset)) - (java.util.Arrays/fill buf-arr (byte 0)) - (let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))] - (.limit buf bytes-read) - (swap! total-bytes-read + bytes-read)) - (loop [initial-matches [] - init-buf-offset 0 - byte-offset start-byte-offset - before-bytes nil] - (let [[matches new-byte-offset new-before-bytes] - (buffer-substring-search! - is-daemon - file - file-len - byte-offset - init-buf-offset - stream - start-byte-offset - @total-bytes-read - buf - search-bytes - initial-matches - num-matches - before-bytes)] - (if (and (< (count matches) num-matches) - (< (+ @total-bytes-read start-byte-offset) file-len)) - (let [;; The start index is positioned to find any possible - ;; occurrence search string that did not quite fit in the - ;; buffer on the previous read. - new-buf-offset (- (min (.limit ^ByteBuffer buf) - grep-max-search-size) - (alength search-bytes))] - (rotate-grep-buffer! buf stream total-bytes-read file file-len) - (when (< @total-bytes-read 0) - (throw (InvalidRequestException. "Cannot search past the end of the file"))) - (recur matches - new-buf-offset - new-byte-offset - new-before-bytes)) - (merge {"isDaemon" (if is-daemon "yes" "no")} - (mk-grep-response search-bytes - start-byte-offset - matches - (if-not (and (< (count matches) num-matches) - (>= @total-bytes-read file-len)) - (let [next-byte-offset (+ (get (last matches) - "byteOffset") - (alength search-bytes))] - (if (> file-len next-byte-offset) - next-byte-offset)))))))))) - -(defn- try-parse-int-param - [nam value] - (try - (Integer/parseInt value) - (catch java.lang.NumberFormatException e - (-> - (str "Could not parse " nam " to an integer") - (InvalidRequestException. e) - throw)))) - -(defn search-log-file - [fname user ^String root-dir is-daemon search num-matches offset callback origin] - (let [file (.getCanonicalFile (File. root-dir fname))] - (if (.exists file) - (if (or is-daemon - (or (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user fname *STORM-CONF*))) - (let [num-matches-int (if num-matches - (try-parse-int-param "num-matches" - num-matches)) - offset-int (if offset - (try-parse-int-param "start-byte-offset" offset))] - (try - (if (and (not (empty? search)) - <= (count (.getBytes search "UTF-8")) grep-max-search-size) - (json-response - (merge {"isDaemon" (if is-daemon "yes" "no")} - (substring-search file - search - :is-daemon is-daemon - :num-matches num-matches-int - :start-byte-offset offset-int)) - - callback - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"}) - (throw - (InvalidRequestException. - (str "Search substring must be between 1 and 1024 UTF-8 " - "bytes in size (inclusive)")))) - (catch Exception ex - (json-response (UIHelpers/exceptionToJson ex) callback :status 500)))) - (json-response (UIHelpers/unauthorizedUserJson user) callback :status 401)) - (json-response {"error" "Not Found" - "errorMessage" "The file was not found on this node."} - callback - :status 404)))) - -(defn find-n-matches [logs n file-offset offset search] - (let [logs (drop file-offset logs) - wrap-matches-fn (fn [matches] - {"fileOffset" file-offset - "searchString" search - "matches" matches})] - (loop [matches [] - logs logs - offset offset - file-offset file-offset - match-count 0] - (if (empty? logs) - (wrap-matches-fn matches) - (let [these-matches (try - (log-debug "Looking through " (first logs)) - (substring-search (first logs) - search - :num-matches (- n match-count) - :start-byte-offset offset) - (catch InvalidRequestException e - (log-error e "Can't search past end of file.") - {})) - file-name (get-topo-port-workerlog (first logs)) - new-matches (conj matches - (merge these-matches - { "fileName" file-name - "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))})) - new-count (+ match-count (count (these-matches "matches")))] - (if (empty? these-matches) - (recur matches (rest logs) 0 (+ file-offset 1) match-count) - (if (>= new-count n) - (wrap-matches-fn new-matches) - (recur new-matches (rest logs) 0 (+ file-offset 1) new-count)))))))) - -(defn logs-for-port - "Get the filtered, authorized, sorted log files for a port." - [user port-dir] - (let [filter-authorized-fn (fn [user logs] - (filter #(or - (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user (get-topo-port-workerlog %) *STORM-CONF*)) logs))] - (sort #(compare (.lastModified %2) (.lastModified %1)) - (filter-authorized-fn - user - (filter #(re-find worker-log-filename-pattern (.getName %)) (DirectoryCleaner/getFilesForDir port-dir)))))) - -(defn deep-search-logs-for-topology - [topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin] - (json-response - (if (or (not search) (not (.exists (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id))))) - [] - (let [file-offset (if file-offset (Integer/parseInt file-offset) 0) - offset (if offset (Integer/parseInt offset) 0) - num-matches (or (Integer/parseInt num-matches) 1) - port-dirs (vec (.listFiles (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id)))) - logs-for-port-fn (partial logs-for-port user)] - (if (or (not port) (= "*" port)) - ;; Check for all ports - (let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn port-dirs))] - (if search-archived? - (map #(find-n-matches % num-matches 0 0 search) - filtered-logs) - (map #(find-n-matches % num-matches 0 0 search) - (map (comp vector first) filtered-logs)))) - ;; Check just the one port - (if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port)) - [] - (let [port-dir (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id ServerUtils/FILE_PATH_SEPARATOR port))] - (if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir))) - [] - (let [filtered-logs (logs-for-port user port-dir)] - (if search-archived? - (find-n-matches filtered-logs num-matches file-offset offset search) - (find-n-matches [(first filtered-logs)] num-matches 0 offset search))))))))) - callback - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"})) - -(defn log-template - ([body] (log-template body nil nil)) - ([body fname user] - (html4 - [:head - [:title (str (escape-html fname) " - Storm Log Viewer")] - (include-css "/css/bootstrap-3.3.1.min.css") - (include-css "/css/jquery.dataTables.1.10.4.min.css") - (include-css "/css/style.css") - ] - [:body - (concat - (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]]) - [[:div.ui-note [:p "Note: the drop-list shows at most 1024 files for each worker directory."]]] - [[:h3 (escape-html fname)]] - (seq body)) - ]))) - -(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*)) - -(defn- parse-long-from-map [m k] - (try - (Long/parseLong (k m)) - (catch NumberFormatException ex - (throw (InvalidRequestException. - (str "Could not make an integer out of the query parameter '" - (name k) "'") - ex))))) - -(defn list-log-files - [user topoId port log-root callback origin] - (let [file-results - (if (nil? topoId) - (if (nil? port) - (get-all-logs-for-rootdir (File. log-root)) - (reduce concat - (for [topo-dir (.listFiles (File. log-root))] - (reduce concat - (for [port-dir (.listFiles topo-dir)] - (if (= (str port) (.getName port-dir)) - (into [] (DirectoryCleaner/getFilesForDir port-dir)))))))) - (if (nil? port) - (let [topo-dir (File. (str log-root ServerUtils/FILE_PATH_SEPARATOR topoId))] - (if (.exists topo-dir) - (reduce concat - (for [port-dir (.listFiles topo-dir)] - (into [] (DirectoryCleaner/getFilesForDir port-dir)))) - [])) - (let [port-dir (ConfigUtils/getWorkerDirFromRoot log-root topoId port)] - (if (.exists port-dir) - (into [] (DirectoryCleaner/getFilesForDir port-dir)) - [])))) - file-strs (sort (for [file file-results] - (get-topo-port-workerlog file)))] - (json-response file-strs - callback - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"}))) - -(defn get-profiler-dump-files - [dir] - (filter (comp not nil?) - (for [f (DirectoryCleaner/getFilesForDir dir)] - (let [name (.getName f)] - (if (or - (.endsWith name ".txt") - (.endsWith name ".jfr") - (.endsWith name ".bin")) - (.getName f)))))) - -(defroutes log-routes - (GET "/log" [:as req & m] - (try - (.mark logviewer:num-log-page-http-requests) - (let [servlet-request (:servlet-request req) - log-root (:log-root req) - user (.getUserName http-creds-handler servlet-request) - start (if (:start m) (parse-long-from-map m :start)) - length (if (:length m) (parse-long-from-map m :length)) - file (URLDecoder/decode (:file m))] - (set-log-file-permissions file log-root) - (log-template (log-page file start length (:grep m) user log-root) - file user)) - (catch InvalidRequestException ex - (log-error ex) - (ring-response-from-exception ex)))) - (GET "/dumps/:topo-id/:host-port/:filename" - [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port filename &m] - (let [user (.getUserName http-creds-handler servlet-request) - port (second (split host-port #":")) - dir (File. (str log-root - ServerUtils/FILE_PATH_SEPARATOR - topo-id - ServerUtils/FILE_PATH_SEPARATOR - port)) - file (File. (str log-root - ServerUtils/FILE_PATH_SEPARATOR - topo-id - ServerUtils/FILE_PATH_SEPARATOR - port - ServerUtils/FILE_PATH_SEPARATOR - filename))] - (if (and (.exists dir) (.exists file)) - (if (or (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user - (str topo-id ServerUtils/FILE_PATH_SEPARATOR port ServerUtils/FILE_PATH_SEPARATOR "worker.log") - *STORM-CONF*)) - (-> (resp/response file) - (resp/content-type "application/octet-stream")) - (unauthorized-user-html user)) - (-> (resp/response "Page not found") - (resp/status 404))))) - (GET "/dumps/:topo-id/:host-port" - [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port &m] - (let [user (.getUserName http-creds-handler servlet-request) - port (second (split host-port #":")) - dir (File. (str log-root - ServerUtils/FILE_PATH_SEPARATOR - topo-id - ServerUtils/FILE_PATH_SEPARATOR - port))] - (if (.exists dir) - (if (or (blank? (*STORM-CONF* UI-FILTER)) - (authorized-log-user? user - (str topo-id ServerUtils/FILE_PATH_SEPARATOR port ServerUtils/FILE_PATH_SEPARATOR "worker.log") - *STORM-CONF*)) - (html4 - [:head - [:title "File Dumps - Storm Log Viewer"] - (include-css "/css/bootstrap-3.3.1.min.css") - (include-css "/css/jquery.dataTables.1.10.4.min.css") - (include-css "/css/style.css")] - [:body - [:ul - (for [file (get-profiler-dump-files dir)] - [:li - [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} file ]])]]) - (unauthorized-user-html user)) - (-> (resp/response "Page not found") - (resp/status 404))))) - (GET "/daemonlog" [:as req & m] - (try - (.mark logviewer:num-daemonlog-page-http-requests) - (let [servlet-request (:servlet-request req) - daemonlog-root (:daemonlog-root req) - user (.getUserName http-creds-handler servlet-request) - start (if (:start m) (parse-long-from-map m :start)) - length (if (:length m) (parse-long-from-map m :length)) - file (URLDecoder/decode (:file m))] - (log-template (daemonlog-page file start length (:grep m) user daemonlog-root) - file user)) - (catch InvalidRequestException ex - (log-error ex) - (ring-response-from-exception ex)))) - (GET "/download" [:as {:keys [servlet-request servlet-response log-root]} & m] - (try - (.mark logviewer:num-download-log-file-http-requests) - (let [user (.getUserName http-creds-handler servlet-request) - file (URLDecoder/decode (:file m))] - (set-log-file-permissions file log-root) - (download-log-file file servlet-request servlet-response user log-root)) - (catch InvalidRequestException ex - (log-error ex) - (ring-response-from-exception ex)))) - (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m] - (try - (.mark logviewer:num-download-log-daemon-file-http-requests) - (let [user (.getUserName http-creds-handler servlet-request)] - (download-log-file file servlet-request servlet-response user daemonlog-root :is-daemon true)) - (catch InvalidRequestException ex - (log-error ex) - (ring-response-from-exception ex)))) - (GET "/search/:file" [:as {:keys [servlet-request servlet-response log-root daemonlog-root]} file & m] - ;; We do not use servlet-response here, but do not remove it from the - ;; :keys list, or this rule could stop working when an authentication - ;; filter is configured. - (try - (let [user (.getUserName http-creds-handler servlet-request) - is-daemon (= (:is-daemon m) "yes")] - (search-log-file (URLDecoder/decode file) - user - (if is-daemon daemonlog-root log-root) - is-daemon - (:search-string m) - (:num-matches m) - (:start-byte-offset m) - (:callback m) - (.getHeader servlet-request "Origin"))) - (catch InvalidRequestException ex - (log-error ex) - (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400)))) - (GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response log-root]} topo-id & m] - ;; We do not use servlet-response here, but do not remove it from the - ;; :keys list, or this rule could stop working when an authentication - ;; filter is configured. - (try - (let [user (.getUserName http-creds-handler servlet-request)] - (deep-search-logs-for-topology topo-id - user - log-root - (:search-string m) - (:num-matches m) - (:port m) - (:start-file-offset m) - (:start-byte-offset m) - (:search-archived m) - (:callback m) - (.getHeader servlet-request "Origin"))) - (catch InvalidRequestException ex - (log-error ex) - (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400)))) - (GET "/searchLogs" [:as req & m] - (try - (let [servlet-request (:servlet-request req) - user (.getUserName http-creds-handler servlet-request)] - (list-log-files user - (:topoId m) - (:port m) - (:log-root req) - (:callback m) - (.getHeader servlet-request "Origin"))) - (catch InvalidRequestException ex - (log-error ex) - (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400)))) - (GET "/listLogs" [:as req & m] - (try - (.mark logviewer:num-list-logs-http-requests) - (let [servlet-request (:servlet-request req) - user (.getUserName http-creds-handler servlet-request)] - (list-log-files user - (:topoId m) - (:port m) - (:log-root req) - (:callback m) - (.getHeader servlet-request "Origin"))) - (catch InvalidRequestException ex - (log-error ex) - (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400)))) - (route/resources "/") - (route/not-found "Page not found")) - -(defn conf-middleware - "For passing the storm configuration with each request." - [app log-root daemonlog-root] - (fn [req] - (app (assoc req :log-root log-root :daemonlog-root daemonlog-root)))) - -(defn start-logviewer! [conf log-root-dir daemonlog-root-dir] - (try - (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES)) - filter-class (conf UI-FILTER) - filter-params (conf UI-FILTER-PARAMS) - logapp (handler/api (-> log-routes - requests-middleware)) ;; query params as map - middle (conf-middleware logapp log-root-dir daemonlog-root-dir) - filters-confs (if (conf UI-FILTER) - [(FilterConfiguration. filter-class (or (conf UI-FILTER-PARAMS) {}))] - []) - filters-confs (concat filters-confs - [(FilterConfiguration. "org.eclipse.jetty.servlets.GzipFilter" "Gzipper" {})]) - https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0)) - keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH) - keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD) - keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE) - key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD) - truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH) - truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD) - truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE) - want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH) - need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)] - (UIHelpers/stormRunJetty (int (conf LOGVIEWER-PORT)) - (reify IConfigurator (execute [this server] - (UIHelpers/configSsl server - https-port - keystore-path - keystore-pass - keystore-type - key-password - truststore-path - truststore-password - truststore-type - want-client-auth - need-client-auth) - (UIHelpers/configFilter server (ring.util.servlet/servlet middle) filters-confs))))) - (catch Exception ex - (log-error ex)))) - -(defn -main [] - (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) - log-root (ConfigUtils/workerArtifactsRoot conf) - daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))] - (Utils/setupDefaultUncaughtExceptionHandler) - (start-log-cleaner! conf log-root) - (log-message "Starting logviewer server for storm version '" - STORM-VERSION - "'") - (start-logviewer! conf log-root daemonlog-root) - (StormMetricsRegistry/startMetricsReporters conf))) http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/logviewer-search-context-tests.log.gz ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/logviewer-search-context-tests.log.gz b/storm-core/src/dev/logviewer-search-context-tests.log.gz deleted file mode 100644 index 5cf2a06..0000000 Binary files a/storm-core/src/dev/logviewer-search-context-tests.log.gz and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/logviewer-search-context-tests.log.test ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/logviewer-search-context-tests.log.test b/storm-core/src/dev/logviewer-search-context-tests.log.test deleted file mode 100644 index 6e4d4af..0000000 --- a/storm-core/src/dev/logviewer-search-context-tests.log.test +++ /dev/null @@ -1 +0,0 @@ -needle needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle needle http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/small-worker.log.test ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/small-worker.log.test b/storm-core/src/dev/small-worker.log.test deleted file mode 100644 index 27d61d1..0000000 --- a/storm-core/src/dev/small-worker.log.test +++ /dev/null @@ -1 +0,0 @@ -000000 needle 000000 http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/test-3072.log.test ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/test-3072.log.test b/storm-core/src/dev/test-3072.log.test deleted file mode 100644 index 56dc6f1..0000000 --- a/storm-core/src/dev/test-3072.log.test +++ /dev/null @@ -1,3 +0,0 @@ -This is a test log file of size 3072. - -..................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................... ..................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................... ..................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................... ....................................needle \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/test-worker.log.test ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/test-worker.log.test b/storm-core/src/dev/test-worker.log.test deleted file mode 100644 index 8fb4c53..0000000 --- a/storm-core/src/dev/test-worker.log.test +++ /dev/null @@ -1,380 +0,0 @@ -Test needle is near the beginning of the file. -This file assumes a buffer size of 2048 bytes, a max search string size of 1024 bytes, and a context length of 128 UTF-8 characters. -The early match tests the case when we find a match too close to the start of the file to give the normal before context strings. - -padding 5 -padding 6 -padding 7 -padding 8 -padding 9 -padding 10 -padding 11 -padding 12 -padding 13 -padding 14 -padding 15 -padding 16 -padding 17 -padding 18 -padding 19 -padding 20 -padding 21 -padding 22 -padding 23 -padding 24 -padding 25 -padding 26 -padding 27 -padding 28 -padding 29 -padding 30 -padding 31 -padding 32 -padding 33 -padding 34 -padding 35 -padding 36 -padding 37 -padding 38 -padding 39 -padding 40 -padding 41 -padding 42 -padding 43 -padding 44 -padding 45 -padding 46 -padding 47 -padding 48 -padding 49 -padding 50 -padding 51 -padding 52 -padding 53 -padding 54 -padding 55 -padding 56 -padding 57 -padding 58 -padding 59 -padding 60 -padding 61 -padding 62 -padding 63 -padding 64 -padding 65 -padding 66 -padding 67 -padding 68 -padding 69 -padding 70 -padding 71 -padding 72 -padding 73 -padding 74 -padding 75 -padding 76 -padding 77 -padding 78 -padding 79 -padding 80 -padding 81 -padding 82 -padding 83 -padding 84 -padding 85 -padding 86 -padding 87 -padding 88 -padding 89 -padding 90 -padding 91 -padding 92 -padding 93 -padding 94 -padding 95 -padding 96 -padding 97 -padding 98 -padding 99 -padding 100 -padding 101 -padding 102 -padding 103 -padding 104 -padding 105 -padding 106 -padding 107 -padding 108 -padding 109 -padding 110 -padding 111 -padding 112 -padding 113 -padding 114 -padding 115 -padding 116 -padding 117 -padding 118 -padding 119 -padding 120 -padding 121 -padding 122 -padding 123 -padding 124 -padding 125 -padding 126 -padding 127 -padding 128 -padding 129 -padding 130 -padding 131 -padding 132 -padding 133 -padding 134 -padding 135 -padding 136 -padding 137 -padding 138 -padding 139 -padding 140 -padding 141 -padding 142 -padding 143 -padding 144 -padding 145 -padding 146 -padding 147 -padding 148 -padding 149 -padding 150 -padding 151 -padding 152 -padding 153 -Near the end of a 1024 byte block, a needle. -A needle that straddles a 1024 byte boundary should also be detected. - -padding 157 -padding 158 -padding 159 -padding 160 -padding 161 -padding 162 -padding 163 -padding 164 -padding 165 -padding 166 -padding 167 -padding 168 -padding 169 -padding 170 -padding 171 -padding 172 -padding 173 -padding 174 -padding 175 -padding 176 -padding 177 -padding 178 -padding 179 -padding 180 -padding 181 -padding 182 -padding 183 -padding 184 -padding 185 -padding 186 -padding 187 -padding 188 -padding 189 -padding 190 -padding 191 -padding 192 -padding 193 -padding 194 -padding 195 -padding 196 -padding 197 -padding 198 -padding 199 -padding 200 -padding 201 -padding 202 -padding 203 -padding 204 -padding 205 -padding 206 -padding 207 -padding 208 -padding 209 -padding 210 -padding 211 -padding 212 -padding 213 -padding 214 -padding 215 -padding 216 -padding 217 -padding 218 -padding 219 -padding 220 -padding 221 -padding 222 -padding 223 -padding 224 -padding 225 -padding 226 -padding 227 -padding 228 -padding 229 -padding 230 -padding 231 -padding 232 -padding 233 -padding 234 -padding 235 - - -Here a needle occurs just after a 1024 byte boundary. It should have the correct context. - -Text with two adjoining matches: needleneedle - -padding 243 -padding 244 -padding 245 -padding 246 -padding 247 -padding 248 -padding 249 -padding 250 -padding 251 -padding 252 -padding 253 -padding 254 -padding 255 -padding 256 -padding 257 -padding 258 -padding 259 -padding 260 -padding 261 -padding 262 -padding 263 -padding 264 -padding 265 -padding 266 -padding 267 -padding 268 -padding 269 -padding 270 -padding 271 -padding 272 -padding 273 -padding 274 -padding 275 -padding 276 -padding 277 -padding 278 -padding 279 -padding 280 -padding 281 -padding 282 -padding 283 -padding 284 -padding 285 -padding 286 -padding 287 -padding 288 -padding 289 -padding 290 -padding 291 -padding 292 -padding 293 -padding 294 -padding 295 -padding 296 -padding 297 -padding 298 -padding 299 -padding 300 -padding 301 -padding 302 -padding 303 -padding 304 - -The following match of 1024 bytes completely fills half the byte buffer. It is a search substring of the maximum size...... - -XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXX -The following max-size match straddles a 1024 byte buffer. -XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXX - -Here are four non-ascii 1-byte UTF-8 characters: αβγδε - -needle - -Here are four printable 2-byte UTF-8 characters: ¡¢£¤¥ - -needle - - - -Here are four printable 3-byte UTF-8 characters: à¤à¤ à¤à¤à¤ - -needle - -Here are four printable 4-byte UTF-8 characters: ððððð - -needle - -Here are four of the same invalid UTF-8 characters: ���������������� - -needle - -padding 332 -padding 333 -padding 334 -padding 335 -padding 336 -padding 337 -padding 338 -padding 339 -padding 340 -padding 341 -padding 342 -padding 343 -padding 344 -padding 345 -padding 346 -padding 347 -padding 348 -padding 349 -padding 350 -padding 351 -padding 352 -padding 353 -padding 354 -padding 355 -padding 356 -padding 357 -padding 358 -padding 359 -padding 360 -padding 361 -padding 362 -padding 363 -padding 364 -padding 365 -padding 366 -padding 367 -padding 368 -padding 369 -padding 370 -padding 371 -padding 372 -padding 373 -padding 374 -padding 375 - -The following tests multibyte UTF-8 Characters straddling the byte boundary: ððð - -needle \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java deleted file mode 100644 index dc76157..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon; - -import java.io.IOException; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.DirectoryStream; -import java.util.Stack; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.Comparator; -import java.util.PriorityQueue; -import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provide methods to help Logviewer to clean up - * files in directories and to get a list of files without - * worrying about excessive memory usage. - * - */ -public class DirectoryCleaner { - private static final Logger LOG = LoggerFactory.getLogger(DirectoryCleaner.class); - // used to recognize the pattern of active log files, we may remove the "current" from this list - private static final Pattern ACTIVE_LOG_PATTERN = Pattern.compile(".*\\.(log|err|out|current|yaml|pid)$"); - // used to recognize the pattern of some meta files in a worker log directory - private static final Pattern META_LOG_PATTERN= Pattern.compile(".*\\.(yaml|pid)$"); - - // not defining this as static is to allow for mocking in tests - public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException { - DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath()); - return stream; - } - - /** - * If totalSize of files exceeds the either the per-worker quota or global quota, - * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs. - * We use the parameter forPerDir to switch between the two deletion modes. - * @param dirs the list of directories to be scanned for deletion - * @param quota the per-dir quota or the total quota for the all directories - * @param forPerDir if true, deletion happens for a single dir; otherwise, for all directories globally - * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs - * @return number of files deleted - */ - public int deleteOldestWhileTooLarge(List<File> dirs, - long quota, boolean forPerDir, Set<String> activeDirs) throws IOException { - final int PQ_SIZE = 1024; // max number of files to delete for every round - final int MAX_ROUNDS = 512; // max rounds of scanning the dirs - long totalSize = 0; - int deletedFiles = 0; - - for (File dir : dirs) { - try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { - for (Path path : stream) { - File file = path.toFile(); - - if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) { - continue; // skip adding length - } - - totalSize += file.length(); - } - } - } - - LOG.debug("totalSize: {} quota: {}", totalSize, quota); - long toDeleteSize = totalSize - quota; - if (toDeleteSize <= 0) { - return deletedFiles; - } - - Comparator<File> comparator = new Comparator<File>() { - public int compare(File f1, File f2) { - if (f1.lastModified() > f2.lastModified()) { - return -1; - } else { - return 1; - } - } - }; - // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root - PriorityQueue<File> pq = new PriorityQueue<File>(PQ_SIZE, comparator); - int round = 0; - while (toDeleteSize > 0) { - LOG.debug("To delete size is {}, start a new round of deletion, round: {}", toDeleteSize, round); - for (File dir : dirs) { - try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { - for (Path path : stream) { - File file = path.toFile(); - if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) { - continue; - } - if (pq.size() < PQ_SIZE) { - pq.offer(file); - } else { - if (file.lastModified() < pq.peek().lastModified()) { - pq.poll(); - pq.offer(file); - } - } - } - } - } - // need to reverse the order of elements in PQ to delete files from oldest to newest - Stack<File> stack = new Stack<File>(); - while (!pq.isEmpty()) { - File file = pq.poll(); - stack.push(file); - } - while (!stack.isEmpty() && toDeleteSize > 0) { - File file = stack.pop(); - toDeleteSize -= file.length(); - LOG.info("Delete file: {}, size: {}, lastModified: {}", file.getCanonicalPath(), file.length(), file.lastModified()); - file.delete(); - deletedFiles++; - } - pq.clear(); - round++; - if (round >= MAX_ROUNDS) { - if (forPerDir) { - LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " + - "a single directory : {}, will delete the rest files in next interval.", - MAX_ROUNDS, dirs.get(0).getCanonicalPath()); - } else { - LOG.warn("Reach the MAX_ROUNDS: {} during global deletion, you may have too many files, " + - "will delete the rest files in next interval.", MAX_ROUNDS); - } - break; - } - } - return deletedFiles; - } - - private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException { - if (forPerDir) { - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - return true; - } - } else { // for global cleanup - if (activeDirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/" - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - return true; - } - } else { - if (META_LOG_PATTERN.matcher(file.getName()).matches()) { - return true; - } - } - } - return false; - } - - // Note that to avoid memory problem, we only return the first 1024 files in a directory - public static List<File> getFilesForDir(File dir) throws IOException { - List<File> files = new ArrayList<File>(); - final int MAX_NUM = 1024; - - try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath())) { - for (Path path : stream) { - files.add(path.toFile()); - if (files.size() >= MAX_NUM) { - break; - } - } - } - return files; - } -}
