http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/test/clj/org/apache/storm/logviewer_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj deleted file mode 100644 index ae9b651..0000000 --- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj +++ /dev/null @@ -1,824 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.logviewer-test - (:use [org.apache.storm daemon-config config util]) - (:require [org.apache.storm.daemon [logviewer :as logviewer]]) - (:require [conjure.core]) - (:use [clojure test]) - (:use [conjure core]) - (:use [org.apache.storm.ui helpers]) - (:import [org.apache.storm.daemon DirectoryCleaner] - [org.apache.storm.utils Time Utils] - [org.apache.storm.utils.staticmocking UtilsInstaller] - [org.apache.storm.daemon.supervisor SupervisorUtils] - [org.apache.storm.testing.staticmocking MockedSupervisorUtils] - [org.apache.storm.generated LSWorkerHeartbeat]) - (:import [java.nio.file Files Path DirectoryStream]) - (:import [java.nio.file Files]) - (:import [java.nio.file.attribute FileAttribute]) - (:import [java.io File]) - (:import [java.util ArrayList]) - (:import [org.mockito Mockito])) - -(defn mk-mock-Path [file] - (let [mockPath (Mockito/mock java.nio.file.Path)] - (. (Mockito/when (.toFile mockPath)) thenReturn file) - mockPath)) - -(defn mk-DirectoryStream [^ArrayList list-of-paths] - (reify DirectoryStream - (close [this]) - (iterator [this] - (.iterator list-of-paths)))) - -(defmulti mk-mock-File #(:type %)) - -(defmethod mk-mock-File :file [{file-name :name mtime :mtime length :length - :or {file-name "afile" - mtime 1 - length (* 10 (* 1024 (* 1024 1024))) }}] ; Length 10 GB - (let [mockFile (Mockito/mock java.io.File)] - (. (Mockito/when (.getName mockFile)) thenReturn file-name) - (. (Mockito/when (.lastModified mockFile)) thenReturn mtime) - (. (Mockito/when (.isFile mockFile)) thenReturn true) - (. (Mockito/when (.getCanonicalPath mockFile)) - thenReturn (str "/mock/canonical/path/to/" file-name)) - (. (Mockito/when (.length mockFile)) thenReturn length) - mockFile)) - -(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime files :files - :or {dir-name "adir" mtime 1 files []}}] - (let [mockDir (Mockito/mock java.io.File)] - (. (Mockito/when (.getName mockDir)) thenReturn dir-name) - (. (Mockito/when (.lastModified mockDir)) thenReturn mtime) - (. (Mockito/when (.isFile mockDir)) thenReturn false) - (. (Mockito/when (.listFiles mockDir)) thenReturn (into-array File files)) - (. (Mockito/when (.getCanonicalPath mockDir)) thenReturn dir-name) - mockDir)) - -(deftest test-get-size-for-logdir - (testing "get the file sizes of a worker log directory" - (stubbing [logviewer/get-stream-for-dir (fn [x] (map #(mk-mock-Path %) (.listFiles x)))] - (let [now-millis (Time/currentTimeMillis) - files1 (into-array File (map #(mk-mock-File {:name (str %) - :type :file - :mtime (- now-millis (* 100 %)) - :length 200}) - (range 1 6))) ;; 5 files - port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1" - :type :directory - :files files1})] - (is (= (logviewer/get-size-for-logdir port1-dir) 1000)))))) - -(deftest test-mk-FileFilter-for-log-cleanup - (testing "log file filter selects the correct worker-log dirs for purge" - (stubbing [logviewer/get-stream-for-dir (fn [x] (map #(mk-mock-Path %) (.listFiles x)))] - (let [now-millis (Time/currentTimeMillis) - conf {LOGVIEWER-CLEANUP-AGE-MINS 60 - LOGVIEWER-CLEANUP-INTERVAL-SECS 300} - cutoff-millis (logviewer/cleanup-cutoff-age-millis conf now-millis) - old-mtime-millis (- cutoff-millis 500) - new-mtime-millis (+ cutoff-millis 500) - matching-files (map #(mk-mock-File %) - [{:name "3031" - :type :directory - :mtime old-mtime-millis} - {:name "3032" - :type :directory - :mtime old-mtime-millis} - {:name "7077" - :type :directory - :mtime old-mtime-millis}]) - excluded-files (map #(mk-mock-File %) - [{:name "oldlog-1-2-worker-.log" - :type :file - :mtime old-mtime-millis} - {:name "newlog-1-2-worker.log" - :type :file - :mtime new-mtime-millis} - {:name "some-old-file.txt" - :type :file - :mtime old-mtime-millis} - {:name "olddir-1-2-worker.log" - :type :directory - :mtime new-mtime-millis} - {:name "metadata" - :type :directory - :mtime new-mtime-millis} - {:name "newdir" - :type :directory - :mtime new-mtime-millis} - ]) - file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)] - (is (every? #(.accept file-filter %) matching-files)) - (is (not-any? #(.accept file-filter %) excluded-files)) - )))) - -(deftest test-per-workerdir-cleanup! - (testing "cleaner deletes oldest files in each worker dir if files are larger than per-dir quota." - (with-open [_ (UtilsInstaller. (proxy [Utils] [] - (forceDeleteImpl [path])))] - (let [cleaner (proxy [org.apache.storm.daemon.DirectoryCleaner] [] - (getStreamForDirectory - ([^File dir] - (mk-DirectoryStream - (ArrayList. - (map #(mk-mock-Path %) (.listFiles dir))))))) - now-millis (Time/currentTimeMillis) - files1 (into-array File (map #(mk-mock-File {:name (str "A" %) - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - files2 (into-array File (map #(mk-mock-File {:name (str "B" %) - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - files3 (into-array File (map #(mk-mock-File {:name (str "C" %) - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1" - :type :directory - :files files1}) - port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2" - :type :directory - :files files2}) - port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3" - :type :directory - :files files3}) - topo1-files (into-array File [port1-dir port2-dir]) - topo2-files (into-array File [port3-dir]) - topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1" - :type :directory - :files topo1-files}) - topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2" - :type :directory - :files topo2-files}) - root-files (into-array File [topo1-dir topo2-dir]) - root-dir (mk-mock-File {:name "/workers-artifacts" - :type :directory - :files root-files}) - deletedFiles (logviewer/per-workerdir-cleanup! root-dir 1200 cleaner)] - (is (= (first deletedFiles) 4)) - (is (= (second deletedFiles) 4)) - (is (= (last deletedFiles) 4)))))) - -(deftest test-global-log-cleanup! - (testing "cleaner deletes oldest when files' sizes are larger than the global quota." - (stubbing [logviewer/get-alive-worker-dirs ["/workers-artifacts/topo1/port1"]] - (with-open [_ (UtilsInstaller. (proxy [Utils] [] - (forceDeleteImpl [path])))] - (let [cleaner (proxy [org.apache.storm.daemon.DirectoryCleaner] [] - (getStreamForDirectory - ([^File dir] - (mk-DirectoryStream - (ArrayList. - (map #(mk-mock-Path %) (.listFiles dir))))))) - now-millis (Time/currentTimeMillis) - files1 (into-array File (map #(mk-mock-File {:name (str "A" % ".log") - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - files2 (into-array File (map #(mk-mock-File {:name (str "B" %) - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - files3 (into-array File (map #(mk-mock-File {:name (str "C" %) - :type :file - :mtime (+ now-millis (* 100 %)) - :length 200 }) - (range 0 10))) - port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1" - :type :directory - :files files1}) ;; note that port1-dir is active worker containing active logs - port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2" - :type :directory - :files files2}) - port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3" - :type :directory - :files files3}) - topo1-files (into-array File [port1-dir port2-dir]) - topo2-files (into-array File [port3-dir]) - topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1" - :type :directory - :files topo1-files}) - topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2" - :type :directory - :files topo2-files}) - root-files (into-array File [topo1-dir topo2-dir]) - root-dir (mk-mock-File {:name "/workers-artifacts" - :type :directory - :files root-files}) - deletedFiles (logviewer/global-log-cleanup! root-dir 2400 cleaner)] - (is (= deletedFiles 18))))))) - -(deftest test-identify-worker-log-dirs - (testing "Build up workerid-workerlogdir map for the old workers' dirs" - (let [port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1" - :type :directory}) - mock-metaFile (mk-mock-File {:name "worker.yaml" - :type :file}) - exp-id "id12345" - expected {exp-id port1-dir} - supervisor-util (Mockito/mock SupervisorUtils)] - (with-open [_ (MockedSupervisorUtils. supervisor-util)] - (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile - logviewer/get-worker-id-from-metadata-file exp-id] - (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil)) - (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))) - -(deftest test-get-dead-worker-dirs - (testing "return directories for workers that are not alive" - (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5} - hb (let [lwb (LSWorkerHeartbeat.)] - (.set_time_secs lwb (int 1)) lwb) - id->hb {"42" hb} - now-secs 2 - unexpected-dir1 (mk-mock-File {:name "dir1" :type :directory}) - expected-dir2 (mk-mock-File {:name "dir2" :type :directory}) - expected-dir3 (mk-mock-File {:name "dir3" :type :directory}) - log-dirs #{unexpected-dir1 expected-dir2 expected-dir3} - supervisor-util (Mockito/mock SupervisorUtils)] - (with-open [_ (MockedSupervisorUtils. supervisor-util)] - (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir1, - "007" expected-dir2, - "" expected-dir3}] ;; this tests a directory with no yaml file thus no worker id - (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb)) - (is (= #{expected-dir2 expected-dir3} - (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))) - -(deftest test-cleanup-fn - (testing "cleanup function forceDeletes files of dead workers" - (let [mockfile1 (mk-mock-File {:name "delete-me1" :type :file}) - mockfile2 (mk-mock-File {:name "delete-me2" :type :file}) - forceDelete-args (atom []) - utils-proxy (proxy [Utils] [] - (forceDeleteImpl [path] - (swap! forceDelete-args conj path)))] - (with-open [_ (UtilsInstaller. utils-proxy)] - (stubbing [logviewer/select-dirs-for-cleanup nil - logviewer/get-dead-worker-dirs (sorted-set mockfile1 mockfile2) - logviewer/cleanup-empty-topodir! nil] - (logviewer/cleanup-fn! "/bogus/path") - (is (= 2 (count @forceDelete-args))) - (is (= (.getCanonicalPath mockfile1) (get @forceDelete-args 0))) - (is (= (.getCanonicalPath mockfile2) (get @forceDelete-args 1)))))))) - -(deftest test-authorized-log-user - (testing "allow cluster admin" - (let [conf {NIMBUS-ADMINS ["alice"]}] - (stubbing [logviewer/get-log-user-group-whitelist [[] []] - logviewer/user-groups []] - (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice")))) - - (testing "ignore any cluster-set topology.users topology.groups" - (let [conf {TOPOLOGY-USERS ["alice"] - TOPOLOGY-GROUPS ["alice-group"]}] - (stubbing [logviewer/get-log-user-group-whitelist [[] []] - logviewer/user-groups ["alice-group"]] - (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" conf))) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice")))) - - (testing "allow cluster logs user" - (let [conf {LOGS-USERS ["alice"]}] - (stubbing [logviewer/get-log-user-group-whitelist [[] []] - logviewer/user-groups []] - (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice")))) - - (testing "allow whitelisted topology user" - (stubbing [logviewer/get-log-user-group-whitelist [["alice"] []] - logviewer/user-groups []] - (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {})) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice"))) - - (testing "allow whitelisted topology group" - (stubbing [logviewer/get-log-user-group-whitelist [[] ["alice-group"]] - logviewer/user-groups ["alice-group"]] - (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {})) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice"))) - - (testing "disallow user not in nimbus admin, topo user, logs user, or whitelist" - (stubbing [logviewer/get-log-user-group-whitelist [[] []] - logviewer/user-groups []] - (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))) - (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname") - (verify-first-call-args-for logviewer/user-groups "alice")))) - -(deftest test-list-log-files - (testing "list-log-files filter selects the correct log files to return" - (let [attrs (make-array FileAttribute 0) - root-path (.getCanonicalPath (.toFile (Files/createTempDirectory "workers-artifacts" attrs))) - file1 (clojure.java.io/file root-path "topoA" "port1" "worker.log") - file2 (clojure.java.io/file root-path "topoA" "port2" "worker.log") - file3 (clojure.java.io/file root-path "topoB" "port1" "worker.log") - _ (clojure.java.io/make-parents file1) - _ (clojure.java.io/make-parents file2) - _ (clojure.java.io/make-parents file3) - _ (.createNewFile file1) - _ (.createNewFile file2) - _ (.createNewFile file3) - origin "www.origin.server.net" - expected-all (json-response '("topoA/port1/worker.log" "topoA/port2/worker.log" - "topoB/port1/worker.log") - nil - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"}) - expected-filter-port (json-response '("topoA/port1/worker.log" "topoB/port1/worker.log") - nil - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"}) - expected-filter-topoId (json-response '("topoB/port1/worker.log") - nil - :headers {"Access-Control-Allow-Origin" origin - "Access-Control-Allow-Credentials" "true"}) - returned-all (logviewer/list-log-files "user" nil nil root-path nil origin) - returned-filter-port (logviewer/list-log-files "user" nil "port1" root-path nil origin) - returned-filter-topoId (logviewer/list-log-files "user" "topoB" nil root-path nil origin)] - (Utils/forceDelete root-path) - (is (= expected-all returned-all)) - (is (= expected-filter-port returned-filter-port)) - (is (= expected-filter-topoId returned-filter-topoId))))) - -(deftest test-search-via-rest-api - (testing "Throws if bogus file is given" - (thrown-cause? java.lang.RuntimeException - (logviewer/substring-search nil "a string"))) - - (let [pattern "needle" - expected-host "dev.null.invalid" - expected-port 8888 - ;; When we click a link to the logviewer, we expect the match line to - ;; be somewhere near the middle of the page. So we subtract half of - ;; the default page length from the offset at which we found the - ;; match. - exp-offset-fn #(- (/ logviewer/default-bytes-per-page 2) %)] - - (stubbing [logviewer/logviewer-port expected-port] - (with-open [_ (UtilsInstaller. (proxy [Utils] [] - (hostnameImpl [] expected-host)))] - (testing "Logviewer link centers the match in the page" - (let [expected-fname "foobar.log"] - (is (= (str "http://" - expected-host - ":" - expected-port - "/log?file=" - expected-fname - "&start=1947&length=" - logviewer/default-bytes-per-page) - (logviewer/url-to-match-centered-in-log-page (byte-array 42) - expected-fname - 27526 - 8888))))) - - (testing "Logviewer link centers the match in the page (daemon)" - (let [expected-fname "foobar.log"] - (is (= (str "http://" - expected-host - ":" - expected-port - "/daemonlog?file=" - expected-fname - "&start=1947&length=" - logviewer/default-bytes-per-page) - (logviewer/url-to-match-centered-in-log-page-daemon-file (byte-array 42) - expected-fname - 27526 - 8888))))) - - (let [file (->> "logviewer-search-context-tests.log.test" - (clojure.java.io/file "src" "dev"))] - (testing "returns correct before/after context" - (is (= {"isDaemon" "no" - "searchString" pattern - "startByteOffset" 0 - "matches" [{"byteOffset" 0 - "beforeString" "" - "afterString" " needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle " - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 7 - "beforeString" "needle " - "afterString" "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle needle\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 127 - "beforeString" "needle needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" - "afterString" " needle\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 134 - "beforeString" " needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle " - "afterString" "\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - ]} - (logviewer/substring-search file pattern))))) - - (let [file (clojure.java.io/file "src" "dev" "small-worker.log.test")] - (testing "a really small log file" - (is (= {"isDaemon" "no" - "searchString" pattern - "startByteOffset" 0 - "matches" [{"byteOffset" 7 - "beforeString" "000000 " - "afterString" " 000000\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")}]} - (logviewer/substring-search file pattern))))) - - (let [file (clojure.java.io/file "src" "dev" "small-worker.log.test")] - (testing "a really small log file (daemon)" - (is (= {"isDaemon" "yes" - "searchString" pattern - "startByteOffset" 0 - "matches" [{"byteOffset" 7 - "beforeString" "000000 " - "afterString" " 000000\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/daemonlog?file=" - (.getName file) - "&start=0&length=51200")}]} - (logviewer/substring-search file pattern :is-daemon true))))) - - (let [file (clojure.java.io/file "src" "dev" "test-3072.log.test")] - (testing "no offset returned when file ends on buffer offset" - (let [expected - {"isDaemon" "no" - "searchString" pattern - "startByteOffset" 0 - "matches" [{"byteOffset" 3066 - "beforeString" (->> - (repeat 128 '.) - clojure.string/join) - "afterString" "" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")}]}] - (is (= expected - (logviewer/substring-search file pattern))) - (is (= expected - (logviewer/substring-search file pattern :num-matches 1)))))) - - (let [file (clojure.java.io/file "src" "dev" "test-worker.log.test")] - - (testing "next byte offsets are correct for each match" - (doseq [[num-matches-sought - num-matches-found - expected-next-byte-offset] [[1 1 11] - [2 2 2042] - [3 3 2052] - [4 4 3078] - [5 5 3196] - [6 6 3202] - [7 7 6252] - [8 8 6321] - [9 9 6397] - [10 10 6476] - [11 11 6554] - [12 12 nil] - [13 12 nil]]] - (let [result - (logviewer/substring-search file - pattern - :num-matches num-matches-sought)] - (is (= expected-next-byte-offset - (get result "nextByteOffset"))) - (is (= num-matches-found (count (get result "matches"))))))) - - (is - (= {"isDaemon" "no" - "nextByteOffset" 6252 - "searchString" pattern - "startByteOffset" 0 - "matches" [ - {"byteOffset" 5 - "beforeString" "Test " - "afterString" " is near the beginning of the file.\nThis file assumes a buffer size of 2048 bytes, a max search string size of 1024 bytes, and a" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 2036 - "beforeString" "ng 146\npadding 147\npadding 148\npadding 149\npadding 150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 byte block, a " - "afterString" ".\nA needle that straddles a 1024 byte boundary should also be detected.\n\npadding 157\npadding 158\npadding 159\npadding 160\npadding" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 2046 - "beforeString" "ding 147\npadding 148\npadding 149\npadding 150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 byte block, a needle.\nA " - "afterString" " that straddles a 1024 byte boundary should also be detected.\n\npadding 157\npadding 158\npadding 159\npadding 160\npadding 161\npaddi" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 3072 - "beforeString" "adding 226\npadding 227\npadding 228\npadding 229\npadding 230\npadding 231\npadding 232\npadding 233\npadding 234\npadding 235\n\n\nHere a " - "afterString" " occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: needleneedle\n\npa" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 3190 - "beforeString" "\n\n\nHere a needle occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: " - "afterString" "needle\n\npadding 243\npadding 244\npadding 245\npadding 246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 251\npadding 252\n" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 3196 - "beforeString" "e a needle occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: needle" - "afterString" "\n\npadding 243\npadding 244\npadding 245\npadding 246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 251\npadding 252\npaddin" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 6246 - "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\n" - "afterString" "\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: à¤à¤ " - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - ]} - (logviewer/substring-search file pattern :num-matches 7))) - - (testing "Correct match offset is returned when skipping bytes" - (let [start-byte-offset 3197] - (is (= {"isDaemon" "no" - "nextByteOffset" 6252 - "searchString" pattern - "startByteOffset" start-byte-offset - "matches" [{"byteOffset" 6246 - "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\n" - "afterString" "\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: à¤à¤ " - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")}]} - (logviewer/substring-search file - pattern - :num-matches 1 - :start-byte-offset start-byte-offset))))) - - (let [pattern (clojure.string/join (repeat 1024 'X))] - (is - (= {"isDaemon" "no" - "nextByteOffset" 6183 - "searchString" pattern - "startByteOffset" 0 - "matches" [ - {"byteOffset" 4075 - "beforeString" "\n\nThe following match of 1024 bytes completely fills half the byte buffer. It is a search substring of the maximum size......\n\n" - "afterString" "\nThe following max-size match straddles a 1024 byte buffer.\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - {"byteOffset" 5159 - "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nThe following max-size match straddles a 1024 byte buffer.\n" - "afterString" "\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\nneedle\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - ]} - (logviewer/substring-search file pattern :num-matches 2)))) - - (let [pattern "ððð"] - (is - (= {"isDaemon" "no" - "nextByteOffset" 7176 - "searchString" pattern - "startByteOffset" 0 - "matches" [ - {"byteOffset" 7164 - "beforeString" "padding 372\npadding 373\npadding 374\npadding 375\n\nThe following tests multibyte UTF-8 Characters straddling the byte boundary: " - "afterString" "\n\nneedle" - "matchString" pattern - "logviewerURL" (str "http://" - expected-host - ":" - expected-port - "/log?file=src%2Fdev%2F" - (.getName file) - "&start=0&length=51200")} - ]} - (logviewer/substring-search file pattern :num-matches 1)))) - - (testing "Returns 0 matches for unseen pattern" - (let [pattern "Not There"] - (is (= {"isDaemon" "no" - "searchString" pattern - "startByteOffset" 0 - "matches" []} - (logviewer/substring-search file - pattern - :num-matches nil - :start-byte-offset nil)))))))))) - -(deftest test-find-n-matches - (testing "find-n-matches looks through logs properly" - (let [files [(clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.test") - (clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.gz")] - matches1 ((logviewer/find-n-matches files 20 0 0 "needle") "matches") - matches2 ((logviewer/find-n-matches files 20 0 126 "needle") "matches") - matches3 ((logviewer/find-n-matches files 20 1 0 "needle") "matches")] - - (is (= 2 (count matches1))) - (is (= 4 (count ((first matches1) "matches")))) - (is (= 4 (count ((second matches1) "matches")))) - (is (= ((first matches1) "fileName") "src/dev/logviewer-search-context-tests.log.test")) - (is (= ((second matches1) "fileName") "src/dev/logviewer-search-context-tests.log.gz")) - - (is (= 2 (count ((first matches2) "matches")))) - (is (= 4 (count ((second matches2) "matches")))) - - (is (= 1 (count matches3))) - (is (= 4 (count ((first matches3) "matches"))))))) - -(deftest test-deep-search-logs-for-topology - (let [files [(clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.test") - (clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.gz")] - attrs (make-array FileAttribute 0) - topo-path (.getCanonicalPath (.toFile (Files/createTempDirectory "topoA" attrs))) - _ (.createNewFile (clojure.java.io/file topo-path "6400")) - _ (.createNewFile (clojure.java.io/file topo-path "6500")) - _ (.createNewFile (clojure.java.io/file topo-path "6600")) - _ (.createNewFile (clojure.java.io/file topo-path "6700"))] - (stubbing - [logviewer/logs-for-port files - logviewer/find-n-matches nil] - (testing "deep-search-logs-for-topology all-ports search-archived = true" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "*" "20" "199" true nil nil) - (verify-call-times-for logviewer/find-n-matches 4) - (verify-call-times-for logviewer/logs-for-port 4) - ; File offset and byte offset should always be zero when searching multiple workers (multiple ports). - (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 0 0 "search") - (verify-nth-call-args-for 2 logviewer/find-n-matches files 20 0 0 "search") - (verify-nth-call-args-for 3 logviewer/find-n-matches files 20 0 0 "search") - (verify-nth-call-args-for 4 logviewer/find-n-matches files 20 0 0 "search"))) - (testing "deep-search-logs-for-topology all-ports search-archived = false" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" nil "20" "199" nil nil nil) - (verify-call-times-for logviewer/find-n-matches 4) - (verify-call-times-for logviewer/logs-for-port 4) - ; File offset and byte offset should always be zero when searching multiple workers (multiple ports). - (verify-nth-call-args-for 1 logviewer/find-n-matches [(first files)] 20 0 0 "search") - (verify-nth-call-args-for 2 logviewer/find-n-matches [(first files)] 20 0 0 "search") - (verify-nth-call-args-for 3 logviewer/find-n-matches [(first files)] 20 0 0 "search") - (verify-nth-call-args-for 4 logviewer/find-n-matches [(first files)] 20 0 0 "search"))) - (testing "deep-search-logs-for-topology one-port search-archived = true, no file-offset" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "0" "0" true nil nil) - (verify-call-times-for logviewer/find-n-matches 1) - (verify-call-times-for logviewer/logs-for-port 2) - (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 0 0 "search"))) - (testing "deep-search-logs-for-topology one-port search-archived = true, file-offset = 1" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "0" true nil nil) - (verify-call-times-for logviewer/find-n-matches 1) - (verify-call-times-for logviewer/logs-for-port 2) - (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 1 0 "search"))) - (testing "deep-search-logs-for-topology one-port search-archived = false, file-offset = 1" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "0" nil nil nil) - (verify-call-times-for logviewer/find-n-matches 1) - (verify-call-times-for logviewer/logs-for-port 2) - ; File offset should be zero, since search-archived is false. - (verify-nth-call-args-for 1 logviewer/find-n-matches [(first files)] 20 0 0 "search"))) - (testing "deep-search-logs-for-topology one-port search-archived = true, file-offset = 1, byte-offset = 100" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "100" true nil nil) - (verify-call-times-for logviewer/find-n-matches 1) - (verify-call-times-for logviewer/logs-for-port 2) - ; File offset should be zero, since search-archived is false. - (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 1 100 "search"))) - (testing "deep-search-logs-for-topology bad-port search-archived = false, file-offset = 1" - (instrumenting - [logviewer/find-n-matches - logviewer/logs-for-port] - (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "2700" "1" "0" nil nil nil) - ; Called with a bad port (not in the config) No searching should be done. - (verify-call-times-for logviewer/find-n-matches 0) - (verify-call-times-for logviewer/logs-for-port 0))) - (Utils/forceDelete topo-path)))) -
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java new file mode 100644 index 0000000..612e0e2 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer; + +public class LogviewerConstant { + public static final int DEFAULT_BYTES_PER_PAGE = 51200; +} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java new file mode 100644 index 0000000..2b26702 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer; + +import com.codahale.metrics.Meter; +import com.google.common.annotations.VisibleForTesting; +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.LogCleaner; +import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.ui.FilterConfiguration; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.resource.Resource; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES; + +public class LogviewerServer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class); + private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); + public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public"; + + private static Server mkHttpServer(Map<String, Object> conf) { + Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT); + Server ret = null; + if (logviewerHttpPort != null && logviewerHttpPort >= 0) { + LOG.info("Starting Logviewer HTTP servers..."); + Integer headerBufferSize = ObjectReader.getInt(conf.get(UI_HEADER_BUFFER_BYTES)); + String filterClass = (String) (conf.get(DaemonConfig.UI_FILTER)); + @SuppressWarnings("unchecked") + Map<String, String> filterParams = (Map<String, String>) (conf.get(DaemonConfig.UI_FILTER_PARAMS)); + FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams); + final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration); + + final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), 0); + final String httpsKsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PATH)); + final String httpsKsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PASSWORD)); + final String httpsKsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_TYPE)); + final String httpsKeyPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEY_PASSWORD)); + final String httpsTsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PATH)); + final String httpsTsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD)); + final String httpsTsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_TYPE)); + final Boolean httpsWantClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_WANT_CLIENT_AUTH)); + final Boolean httpsNeedClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_NEED_CLIENT_AUTH)); + + //TODO a better way to do this would be great. + LogviewerApplication.setup(conf); + ret = UIHelpers.jettyCreateServer(logviewerHttpPort, null, httpsPort); + + UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, + httpsTsPath, httpsTsPassword, httpsTsType, httpsNeedClientAuth, httpsWantClientAuth); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + try { + context.setBaseResource(Resource.newResource(STATIC_RESOURCE_DIRECTORY_PATH)); + } catch (IOException e) { + throw new RuntimeException("Can't locate static resource directory " + STATIC_RESOURCE_DIRECTORY_PATH); + } + + context.setContextPath("/"); + ret.setHandler(context); + + ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class); + holderPwd.setInitOrder(1); + context.addServlet(holderPwd,"/"); + + ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/api/v1/*"); + jerseyServlet.setInitOrder(2); + jerseyServlet.setInitParameter("javax.ws.rs.Application", LogviewerApplication.class.getName()); + + UIHelpers.configFilters(context, filterConfigurations); + } + return ret; + } + + private final Server httpServer; + private boolean closed = false; + + /** + * Constructor. + * @param conf Logviewer conf for the servers + */ + public LogviewerServer(Map<String, Object> conf) { + httpServer = mkHttpServer(conf); + } + + @VisibleForTesting + void start() throws Exception { + LOG.info("Starting Logviewer..."); + if (httpServer != null) { + httpServer.start(); + } + } + + @VisibleForTesting + void awaitTermination() throws InterruptedException { + httpServer.join(); + } + + @Override + public synchronized void close() { + if (!closed) { + //This is kind of useless... + meterShutdownCalls.mark(); + + //TODO this is causing issues... + //if (httpServer != null) { + // httpServer.destroy(); + //} + + closed = true; + } + } + + /** + * @return The port the HTTP server is listening on. Not available until {@link #start() } has run. + */ + public int getHttpServerPort() { + assert httpServer.getConnectors().length == 1; + + return httpServer.getConnectors()[0].getLocalPort(); + } + + /** + * Main method to start the server. + */ + public static void main(String [] args) throws Exception { + Utils.setupDefaultUncaughtExceptionHandler(); + Map<String, Object> conf = Utils.readStormConfig(); + + DirectoryCleaner directoryCleaner = new DirectoryCleaner(); + + try (LogviewerServer server = new LogviewerServer(conf); + LogCleaner logCleaner = new LogCleaner(conf, directoryCleaner)) { + Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); + logCleaner.start(); + StormMetricsRegistry.startMetricsReporters(conf); + server.start(); + server.awaitTermination(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java new file mode 100644 index 0000000..b8f32f2 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.handler; + +import org.apache.storm.daemon.logviewer.utils.LogFileDownloader; +import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; + +import javax.ws.rs.core.Response; +import java.io.IOException; + +public class LogviewerLogDownloadHandler { + + private final LogFileDownloader logFileDownloadHelper; + + public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { + this.logFileDownloadHelper = new LogFileDownloader(logRoot, daemonLogRoot, resourceAuthorizer); + } + + public Response downloadLogFile(String fileName, String user) throws IOException { + return logFileDownloadHelper.downloadFile(fileName, user, false); + } + + public Response downloadDaemonLogFile(String fileName, String user) throws IOException { + return logFileDownloadHelper.downloadFile(fileName, user, true); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java new file mode 100644 index 0000000..0881b69 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.handler; + +import j2html.TagCreator; +import j2html.tags.DomContent; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.LogviewerConstant; +import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; +import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.daemon.logviewer.utils.WorkerLogs; +import org.apache.storm.daemon.utils.StreamUtil; +import org.apache.storm.daemon.utils.URLBuilder; +import org.apache.storm.ui.InvalidRequestException; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ServerUtils; +import org.apache.storm.utils.Utils; +import org.jooq.lambda.Unchecked; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +import javax.ws.rs.core.Response; + +import static j2html.TagCreator.*; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toList; +import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; + +public class LogviewerLogPageHandler { + private final String logRoot; + private final String daemonLogRoot; + private final ResourceAuthorizer resourceAuthorizer; + + public LogviewerLogPageHandler(String logRoot, String daemonLogRoot, + ResourceAuthorizer resourceAuthorizer) { + this.logRoot = logRoot; + this.daemonLogRoot = daemonLogRoot; + this.resourceAuthorizer = resourceAuthorizer; + } + + public Response listLogFiles(String user, Integer port, String topologyId, String callback, String origin) throws IOException { + List<File> fileResults = null; + if (topologyId == null) { + if (port == null) { + fileResults = WorkerLogs.getAllLogsForRootDir(new File(logRoot)); + } else { + fileResults = new ArrayList<>(); + + File[] logRootFiles = new File(logRoot).listFiles(); + if (logRootFiles != null) { + for (File topoDir : logRootFiles) { + File[] topoDirFiles = topoDir.listFiles(); + if (topoDirFiles != null) { + for (File portDir : topoDirFiles) { + if (portDir.getName().equals(port.toString())) { + fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); + } + } + } + } + } + } + } else { + if (port == null) { + fileResults = new ArrayList<>(); + + File topoDir = new File(logRoot + Utils.FILE_PATH_SEPARATOR + topologyId); + if (topoDir.exists()) { + File[] topoDirFiles = topoDir.listFiles(); + if (topoDirFiles != null) { + for (File portDir : topoDirFiles) { + fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); + } + } + } + + } else { + File portDir = ConfigUtils.getWorkerDirFromRoot(logRoot, topologyId, port); + if (portDir.exists()) { + fileResults = DirectoryCleaner.getFilesForDir(portDir); + } + } + } + + List<String> files; + if (fileResults != null) { + files = fileResults.stream() + .map(file -> WorkerLogs.getTopologyPortWorkerLog(file)) + .sorted().collect(toList()); + } else { + files = new ArrayList<>(); + } + + return LogviewerResponseBuilder.buildSuccessJsonResponse(files, callback, origin); + } + + public Response logPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { + String rootDir = logRoot; + if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { + File file = new File(rootDir, fileName).getCanonicalFile(); + String path = file.getCanonicalPath(); + boolean isZipFile = path.endsWith(".gz"); + File topoDir = file.getParentFile().getParentFile(); + + if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) { + long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + + SortedSet<File> logFiles; + try { + logFiles = Arrays.stream(topoDir.listFiles()) + .flatMap(Unchecked.function(portDir -> DirectoryCleaner.getFilesForDir(portDir).stream())) + .filter(File::isFile) + .collect(toCollection(TreeSet::new)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + + List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) + .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); + + List<String> reorderedFilesStr = new ArrayList<>(); + reorderedFilesStr.addAll(filesStrWithoutFileParam); + reorderedFilesStr.add(fileName); + + length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; + + String logString; + if (isTxtFile(fileName)) { + logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); + } else { + logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + } + + start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + + List<DomContent> bodyContents = new ArrayList<>(); + if (StringUtils.isNotEmpty(grep)) { + String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) + .filter(str -> str.contains(grep)).collect(toList())); + bodyContents.add(pre(matchedString).withId("logContent")); + } else { + DomContent pagerData = null; + if (isTxtFile(fileName)) { + pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "log"); + } + + bodyContents.add(searchFileForm(fileName, "no")); + // list all files for this topology + bodyContents.add(logFileSelectionForm(reorderedFilesStr, "log")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + bodyContents.add(downloadLink(fileName)); + bodyContents.add(pre(logString).withClass("logContent")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + } + + String content = logTemplate(bodyContents, fileName, user).render(); + return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } else { + if (resourceAuthorizer.getLogUserGroupWhitelist(fileName) != null) { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } else { + return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); + } + } + } + + public Response daemonLogPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { + String rootDir = daemonLogRoot; + File file = new File(rootDir, fileName).getCanonicalFile(); + String path = file.getCanonicalPath(); + boolean isZipFile = path.endsWith(".gz"); + + if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { + long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + + // all types of files included + List<File> logFiles = Arrays.stream(new File(rootDir).listFiles()) + .filter(File::isFile) + .collect(toList()); + + List<String> filesStrWithoutFileParam = logFiles.stream() + .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); + + List<String> reorderedFilesStr = new ArrayList<>(); + reorderedFilesStr.addAll(filesStrWithoutFileParam); + reorderedFilesStr.add(fileName); + + length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; + + String logString; + if (isTxtFile(fileName)) { + logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); + } else { + logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + } + + start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + + List<DomContent> bodyContents = new ArrayList<>(); + if (StringUtils.isNotEmpty(grep)) { + String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) + .filter(str -> str.contains(grep)).collect(toList())); + bodyContents.add(pre(matchedString).withId("logContent")); + } else { + DomContent pagerData = null; + if (isTxtFile(fileName)) { + pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "daemonlog"); + } + + bodyContents.add(searchFileForm(fileName, "yes")); + // list all daemon logs + bodyContents.add(logFileSelectionForm(reorderedFilesStr, "daemonlog")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + bodyContents.add(daemonDownloadLink(fileName)); + bodyContents.add(pre(logString).withClass("logContent")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + } + + String content = logTemplate(bodyContents, fileName, user).render(); + return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } + + private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) { + List<DomContent> finalBodyContents = new ArrayList<>(); + + if (StringUtils.isNotBlank(user)) { + finalBodyContents.add(div(p("User: " + user)).withClass("ui-user")); + } + + finalBodyContents.add(div(p("Note: the drop-list shows at most 1024 files for each worker directory.")).withClass("ui-note")); + finalBodyContents.add(h3(escapeHtml(fileName))); + finalBodyContents.addAll(bodyContents); + + return html( + head( + title(escapeHtml(fileName) + " - Storm Log Viewer"), + link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"), + link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"), + link().withRel("stylesheet").withHref("/css/style.css") + ), + body( + finalBodyContents.toArray(new DomContent[]{}) + ) + ); + } + + private DomContent downloadLink(String fileName) { + return p(linkTo(UIHelpers.urlFormat("/api/v1/download?file=%s", fileName), "Download Full File")); + } + + private DomContent daemonDownloadLink(String fileName) { + return p(linkTo(UIHelpers.urlFormat("/api/v1/daemondownload?file=%s", fileName), "Download Full File")); + } + + private DomContent linkTo(String url, String content) { + return a(content).withHref(url); + } + + private DomContent logFileSelectionForm(List<String> logFiles, String type) { + return form( + dropDown("file", logFiles), + input().withType("submit").withValue("Switch file") + ).withAction(type).withId("list-of-files"); + } + + private DomContent dropDown(String name, List<String> logFiles) { + List<DomContent> options = logFiles.stream().map(TagCreator::option).collect(toList()); + return select(options.toArray(new DomContent[]{})).withName(name).withId(name); + } + + private DomContent searchFileForm(String fileName, String isDaemonValue) { + return form( + text("search this file:"), + input().withType("text").withName("search"), + input().withType("hidden").withName("is-daemon").withValue(isDaemonValue), + input().withType("hidden").withName("file").withValue(fileName), + input().withType("submit").withValue("Search") + ).withAction("/logviewer_search.html").withId("search-box"); + } + + private DomContent pagerLinks(String fileName, Integer start, Integer length, Integer fileLength, String type) { + int prevStart = Math.max(0, start - length); + int nextStart = fileLength > 0 ? Math.min(Math.max(0, fileLength - length), start + length) : start + length; + List<DomContent> btnLinks = new ArrayList<>(); + + Map<String, Object> urlQueryParams = new HashMap<>(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", Math.max(0, start - length)); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Prev", prevStart < start)); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", 0); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "First")); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Last")); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", Math.min(Math.max(0, fileLength - length), start + length)); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Next", nextStart > start)); + + return div(btnLinks.toArray(new DomContent[]{})); + } + + private DomContent toButtonLink(String url, String text) { + return toButtonLink(url, text, true); + } + + private DomContent toButtonLink(String url, String text, boolean enabled) { + return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled")); + } + + private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException { + boolean isZipFile = path.endsWith(".gz"); + long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); + long skip = fileLength - tail; + return pageFile(path, Long.valueOf(skip).intValue(), tail); + } + + private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException { + boolean isZipFile = path.endsWith(".gz"); + long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); + + try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path); + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + if (start >= fileLength) { + throw new InvalidRequestException("Cannot start past the end of the file"); + } + if (start > 0) { + StreamUtil.skipBytes(input, start); + } + + byte[] buffer = new byte[1024]; + while (output.size() < length) { + int size = input.read(buffer, 0, Math.min(1024, length - output.size())); + if (size > 0) { + output.write(buffer, 0, size); + } else { + break; + } + } + + return output.toString(); + } + } + + private boolean isTxtFile(String fileName) { + Pattern p = Pattern.compile("\\.(log.*|txt|yaml|pid)$"); + Matcher matcher = p.matcher(fileName); + return matcher.find(); + } +}
