STORM-1280 port backtype.storm.daemon.logviewer to java * port logviewer to Java using Dropwizard * follow implementation of DRPC Server * add '/api/v1/' to the logviewer API endpoints * seems like it is not possible (or not easy) to serve both API endpoints and static resources in same servlet * reflect the change to the static resources * TODO: logviewer test should be ported to Java since ported version of Logviewer is placed to storm-webapp module * can't remove logviewer.clj for now, hence adding 'wip' to package to avoid clash * ported version of Logviewer is manually tested, not testing with logviewer-test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11a79053 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11a79053 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11a79053 Branch: refs/heads/master Commit: 11a79053f7a9250b7347013cafff0e8eec79ec70 Parents: 559053d Author: Jungtaek Lim <[email protected]> Authored: Tue Jul 11 13:49:56 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Jul 14 12:11:41 2017 +0900 ---------------------------------------------------------------------- bin/storm.py | 8 +- pom.xml | 3 + .../org/apache/storm/daemon/StormCommon.java | 2 - storm-core/src/clj/org/apache/storm/ui/core.clj | 10 +- .../apache/storm/daemon/DirectoryCleaner.java | 52 +- storm-core/src/ui/public/component.html | 2 +- .../src/ui/public/deep_search_result.html | 2 +- storm-core/src/ui/public/logviewer_search.html | 2 +- storm-core/src/ui/public/search_result.html | 4 +- .../storm/hack/StormShadeTransformer.java | 5 - .../reporters/JmxPreparableReporter.java | 1 - .../storm/daemon/supervisor/BasicContainer.java | 1 - storm-webapp/pom.xml | 53 +- .../common/AuthorizationExceptionMapper.java | 40 ++ .../daemon/common/JsonResponseBuilder.java | 66 ++ .../webapp/AuthorizationExceptionMapper.java | 40 -- .../daemon/drpc/webapp/DRPCApplication.java | 1 + .../daemon/utils/ListFunctionalSupport.java | 69 ++ .../apache/storm/daemon/utils/StreamUtil.java | 38 + .../apache/storm/daemon/utils/URLBuilder.java | 43 ++ .../daemon/wip/logviewer/LogviewerConstant.java | 23 + .../daemon/wip/logviewer/LogviewerServer.java | 174 +++++ .../handler/LogviewerLogDownloadHandler.java | 43 ++ .../handler/LogviewerLogPageHandler.java | 412 +++++++++++ .../handler/LogviewerLogSearchHandler.java | 686 +++++++++++++++++++ .../handler/LogviewerProfileHandler.java | 115 ++++ .../daemon/wip/logviewer/utils/LogCleaner.java | 296 ++++++++ .../wip/logviewer/utils/LogFileDownloader.java | 51 ++ .../utils/LogviewerResponseBuilder.java | 118 ++++ .../wip/logviewer/utils/ResourceAuthorizer.java | 129 ++++ .../daemon/wip/logviewer/utils/WorkerLogs.java | 63 ++ .../logviewer/webapp/LogviewerApplication.java | 94 +++ .../wip/logviewer/webapp/LogviewerResource.java | 221 ++++++ 33 files changed, 2773 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index 8f7a635..dade6b5 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -808,12 +808,16 @@ def logviewer(): "-DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector", "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml") ] + + allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR) + allextrajars.append(CLUSTER_CONF_DIR) exec_storm_class( - "org.apache.storm.daemon.logviewer", + "org.apache.storm.daemon.wip.logviewer.LogviewerServer", jvmtype="-server", daemonName="logviewer", jvmopts=jvmopts, - extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) + extrajars=allextrajars) + def drpcclient(*args): """Syntax: [storm drpc-client [options] ([function argument]*)|(argument*)] http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index eb3ba05..ffee0cd 100644 --- a/pom.xml +++ b/pom.xml @@ -310,6 +310,9 @@ <qpid.version>0.32</qpid.version> <azure-eventhubs.version>0.13.1</azure-eventhubs.version> <jersey.version>2.24.1</jersey.version> + <dropwizard.version>1.1.2</dropwizard.version> + <j2html.version>1.0.0</j2html.version> + <jool.version>0.9.12</jool.version> <!-- see intellij profile below... This fixes an annoyance with intellij --> <provided.scope>provided</provided.scope> http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java index 3f85a13..9b5163b 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java @@ -52,10 +52,8 @@ import org.apache.storm.task.IBolt; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ThriftTopologyUtils; import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.ThriftTopologyUtils; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index d1bdee8..5b0613c 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -134,12 +134,12 @@ (defn logviewer-link [host fname secure?] (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT)) - (UIHelpers/urlFormat "https://%s:%s/log?file=%s" + (UIHelpers/urlFormat "https://%s:%s/api/v1/log?file=%s" (to-array [host (*STORM-CONF* LOGVIEWER-HTTPS-PORT) fname])) - (UIHelpers/urlFormat "http://%s:%s/log?file=%s" + (UIHelpers/urlFormat "http://%s:%s/api/v1/log?file=%s" (to-array [host (*STORM-CONF* LOGVIEWER-PORT) @@ -156,10 +156,10 @@ (logviewer-link host fname secure?)))) (defn nimbus-log-link [host] - (UIHelpers/urlFormat "http://%s:%s/daemonlog?file=nimbus.log" (to-array [host (*STORM-CONF* LOGVIEWER-PORT)]))) + (UIHelpers/urlFormat "http://%s:%s/api/v1/daemonlog?file=nimbus.log" (to-array [host (*STORM-CONF* LOGVIEWER-PORT)]))) (defn supervisor-log-link [host] - (UIHelpers/urlFormat "http://%s:%s/daemonlog?file=supervisor.log" (to-array [host (*STORM-CONF* LOGVIEWER-PORT)]))) + (UIHelpers/urlFormat "http://%s:%s/api/v1/daemonlog?file=supervisor.log" (to-array [host (*STORM-CONF* LOGVIEWER-PORT)]))) (defn get-error-data [error] @@ -185,7 +185,7 @@ (.get_error_time_secs ^ErrorInfo error))) (defn worker-dump-link [host port topology-id] - (UIHelpers/urlFormat "http://%s:%s/dumps/%s/%s" + (UIHelpers/urlFormat "http://%s:%s/api/v1/dumps/%s/%s" (to-array [(URLEncoder/encode host) (*STORM-CONF* LOGVIEWER-PORT) (URLEncoder/encode topology-id) http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java index 3a6eeba..dc76157 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java @@ -55,15 +55,15 @@ public class DirectoryCleaner { /** * If totalSize of files exceeds the either the per-worker quota or global quota, * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs. - * We use the parameter for_per_dir to switch between the two deletion modes. + * We use the parameter forPerDir to switch between the two deletion modes. * @param dirs the list of directories to be scanned for deletion * @param quota the per-dir quota or the total quota for the all directories - * @param for_per_dir if true, deletion happens for a single dir; otherwise, for all directories globally - * @param active_dirs only for global deletion, we want to skip the active logs in active_dirs + * @param forPerDir if true, deletion happens for a single dir; otherwise, for all directories globally + * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs * @return number of files deleted */ public int deleteOldestWhileTooLarge(List<File> dirs, - long quota, boolean for_per_dir, Set<String> active_dirs) throws IOException { + long quota, boolean forPerDir, Set<String> activeDirs) throws IOException { final int PQ_SIZE = 1024; // max number of files to delete for every round final int MAX_ROUNDS = 512; // max rounds of scanning the dirs long totalSize = 0; @@ -73,10 +73,17 @@ public class DirectoryCleaner { try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { for (Path path : stream) { File file = path.toFile(); + + if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) { + continue; // skip adding length + } + totalSize += file.length(); } } } + + LOG.debug("totalSize: {} quota: {}", totalSize, quota); long toDeleteSize = totalSize - quota; if (toDeleteSize <= 0) { return deletedFiles; @@ -100,20 +107,8 @@ public class DirectoryCleaner { try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { for (Path path : stream) { File file = path.toFile(); - if (for_per_dir) { - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip active log files - } - } else { // for global cleanup - if (active_dirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/" - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip active log files - } - } else { - if (META_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip yaml and pid files - } - } + if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) { + continue; } if (pq.size() < PQ_SIZE) { pq.offer(file); @@ -142,7 +137,7 @@ public class DirectoryCleaner { pq.clear(); round++; if (round >= MAX_ROUNDS) { - if (for_per_dir) { + if (forPerDir) { LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " + "a single directory : {}, will delete the rest files in next interval.", MAX_ROUNDS, dirs.get(0).getCanonicalPath()); @@ -156,6 +151,25 @@ public class DirectoryCleaner { return deletedFiles; } + private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException { + if (forPerDir) { + if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { + return true; + } + } else { // for global cleanup + if (activeDirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/" + if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { + return true; + } + } else { + if (META_LOG_PATTERN.matcher(file.getName()).matches()) { + return true; + } + } + } + return false; + } + // Note that to avoid memory problem, we only return the first 1024 files in a directory public static List<File> getFilesForDir(File dir) throws IOException { List<File> files = new ArrayList<File>(); http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/ui/public/component.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html index 942b12d..4d602f2 100644 --- a/storm-core/src/ui/public/component.html +++ b/storm-core/src/ui/public/component.html @@ -182,7 +182,7 @@ $(document).ready(function() { var loc = $(row[3])[0]; // logviewer URL return '<input type="checkbox" class="workerActionCheckbox"'+ 'id="'+checkboxId+'" value="'+host_port+'"'+checkedString+'/> '+ - '<a href="'+loc.protocol+'//'+loc.host+'/dumps/'+topologyId+'/'+ + '<a href="'+loc.protocol+'//'+loc.host+'/api/v1/dumps/'+topologyId+'/'+ encodeURIComponent(host_port)+'">files</a>'; break; case 'sort': http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/ui/public/deep_search_result.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/deep_search_result.html b/storm-core/src/ui/public/deep_search_result.html index 5de2ab8..5f5e5c6 100644 --- a/storm-core/src/ui/public/deep_search_result.html +++ b/storm-core/src/ui/public/deep_search_result.html @@ -120,7 +120,7 @@ $(document).ready(function() { for (var host in distinct_hosts) { - var searchURL = "http://"+host+":"+logviewerPort+"/deepSearch/"+id+"?search-string="+search+"&num-matches="+count+"&port="+port; + var searchURL = "http://"+host+":"+logviewerPort+"/api/v1/deepSearch/"+id+"?search-string="+search+"&num-matches="+count+"&port="+port; if(search_archived) searchURL += "&search-archived=" + search_archived; http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/ui/public/logviewer_search.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/logviewer_search.html b/storm-core/src/ui/public/logviewer_search.html index 3a1682e..89a7854 100644 --- a/storm-core/src/ui/public/logviewer_search.html +++ b/storm-core/src/ui/public/logviewer_search.html @@ -55,7 +55,7 @@ $(document).ready(function() { $("#search-form").append(Mustache.render($(template).filter("#search-single-file").html(),{file: file, search: search, isDaemon: isDaemon})); var result = $("#result"); - var url = "/search/"+encodeURIComponent(file)+"?search-string="+search+"&start-byte-offset="+offset+"&is-daemon="+isDaemon; + var url = "/api/v1/search/"+encodeURIComponent(file)+"?search-string="+search+"&start-byte-offset="+offset+"&is-daemon="+isDaemon; $.getJSON(url,function(response,status,jqXHR) { response.file = file; result.append(Mustache.render($(template).filter("#logviewer-search-result-template").html(),response)); http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-core/src/ui/public/search_result.html ---------------------------------------------------------------------- diff --git a/storm-core/src/ui/public/search_result.html b/storm-core/src/ui/public/search_result.html index ee0efd8..216a700 100644 --- a/storm-core/src/ui/public/search_result.html +++ b/storm-core/src/ui/public/search_result.html @@ -68,9 +68,9 @@ $(document).ready(function() { var port = response.hostPortList[index].port; var elemId = response.hostPortList[index].elemId; var file = id+"/"+port+"/worker.log"; - var searchURL = "http://"+host+":"+logviewerPort+"/search/"+encodeURIComponent(file)+"?search-string="+search+"&num-matches="+count; + var searchURL = "http://"+host+":"+logviewerPort+"/api/v1/search/"+encodeURIComponent(file)+"?search-string="+search+"&num-matches="+count; if (searchArchived != "") { - searchURL = "http://"+host+":"+logviewerPort+"/deepSearch/"+id+"?search-string="+search+"&num-matches="+count+"&search-archived=true&port="+port; + searchURL = "http://"+host+":"+logviewerPort+"/api/v1/deepSearch/"+id+"?search-string="+search+"&num-matches="+count+"&search-archived=true&port="+port; } $.ajax({dataType: "json", http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-rename-hack/src/main/java/org/apache/storm/hack/StormShadeTransformer.java ---------------------------------------------------------------------- diff --git a/storm-rename-hack/src/main/java/org/apache/storm/hack/StormShadeTransformer.java b/storm-rename-hack/src/main/java/org/apache/storm/hack/StormShadeTransformer.java index ba1b1ec..8afbcea 100644 --- a/storm-rename-hack/src/main/java/org/apache/storm/hack/StormShadeTransformer.java +++ b/storm-rename-hack/src/main/java/org/apache/storm/hack/StormShadeTransformer.java @@ -19,13 +19,8 @@ package org.apache.storm.hack; import org.apache.storm.daemon.JarTransformer; -import org.apache.storm.hack.relocation.Relocator; -import org.apache.storm.hack.relocation.SimpleRelocator; -import org.apache.storm.hack.resource.ClojureTransformer; -import org.apache.storm.hack.resource.ResourceTransformer; import java.io.*; -import java.util.Arrays; public class StormShadeTransformer implements JarTransformer { @Override http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java index 48eef02..21aab16 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java @@ -19,7 +19,6 @@ package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; -import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.utils.ObjectReader; http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java index f5eed43..fe088dc 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -22,7 +22,6 @@ import static org.apache.storm.utils.Utils.OR; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/pom.xml ---------------------------------------------------------------------- diff --git a/storm-webapp/pom.xml b/storm-webapp/pom.xml index eed6050..eeb6ee6 100644 --- a/storm-webapp/pom.xml +++ b/storm-webapp/pom.xml @@ -35,7 +35,7 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>2.0.0-SNAPSHOT</version> - <scope>provided</scope> + <scope>${provided.scope}</scope> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> @@ -75,24 +75,49 @@ <scope>test</scope> </dependency> <dependency> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> - <groupId>org.glassfish.jersey.core</groupId> - <artifactId>jersey-server</artifactId> - </dependency> + <groupId>io.dropwizard</groupId> + <artifactId>dropwizard-core</artifactId> + <version>${dropwizard.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> - <groupId>org.glassfish.jersey.containers</groupId> - <artifactId>jersey-container-servlet-core</artifactId> - </dependency> + <groupId>io.dropwizard</groupId> + <artifactId>dropwizard-assets</artifactId> + <version>${dropwizard.version}</version> + </dependency> + <dependency> + <groupId>com.j2html</groupId> + <artifactId>j2html</artifactId> + <version>${j2html.version}</version> + </dependency> <dependency> - <groupId>org.glassfish.jersey.containers</groupId> - <artifactId>jersey-container-jetty-http</artifactId> + <groupId>org.jooq</groupId> + <artifactId>jool</artifactId> + <version>${jool.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard</groupId> + <artifactId>dropwizard-testing</artifactId> + <version>${dropwizard.version}</version> + <scope>test</scope> </dependency> </dependencies> <build> @@ -118,7 +143,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>4</maxAllowedViolations> + <maxAllowedViolations>500</maxAllowedViolations> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/common/AuthorizationExceptionMapper.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/common/AuthorizationExceptionMapper.java b/storm-webapp/src/main/java/org/apache/storm/daemon/common/AuthorizationExceptionMapper.java new file mode 100644 index 0000000..57467e1 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/common/AuthorizationExceptionMapper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.common; + +import java.util.HashMap; +import java.util.Map; + +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +import org.apache.storm.generated.AuthorizationException; +import org.json.simple.JSONValue; + +@Provider +public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> { + @Override + public Response toResponse(AuthorizationException ex) { + Map<String, String> body = new HashMap<>(); + body.put("error", "Not Authorized"); + body.put("errorMessage", ex.get_msg()); + return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/common/JsonResponseBuilder.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/common/JsonResponseBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/common/JsonResponseBuilder.java new file mode 100644 index 0000000..2048d81 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/common/JsonResponseBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.common; + +import org.apache.storm.ui.UIHelpers; + +import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.Map; + +public class JsonResponseBuilder { + private Object data; + private String callback; + private boolean needSerialize = true; + private int status = 200; + private Map<String, Object> headers = Collections.emptyMap(); + + public JsonResponseBuilder setData(Object data) { + this.data = data; + return this; + } + + public JsonResponseBuilder setCallback(String callback) { + this.callback = callback; + return this; + } + + public JsonResponseBuilder setNeedSerialize(boolean needSerialize) { + this.needSerialize = needSerialize; + return this; + } + + public JsonResponseBuilder setStatus(int status) { + this.status = status; + return this; + } + + public JsonResponseBuilder setHeaders(Map<String, Object> headers) { + this.headers = headers; + return this; + } + + public Response build() { + String body = UIHelpers.getJsonResponseBody(data, callback, needSerialize); + Map<String, Object> respHeaders = UIHelpers.getJsonResponseHeaders(callback, headers); + Response.ResponseBuilder respBuilder = Response.status(status).entity(body); + respHeaders.forEach(respBuilder::header); + return respBuilder.build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java deleted file mode 100644 index fcef715..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.drpc.webapp; - -import java.util.HashMap; -import java.util.Map; - -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; -import javax.ws.rs.ext.Provider; - -import org.apache.storm.generated.AuthorizationException; -import org.json.simple.JSONValue; - -@Provider -public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> { - @Override - public Response toResponse(AuthorizationException ex) { - Map<String, String> body = new HashMap<>(); - body.put("error", "Not Authorized"); - body.put("errorMessage", ex.get_msg()); - return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java index 0074b05..db42310 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java @@ -25,6 +25,7 @@ import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; import org.apache.storm.daemon.drpc.DRPC; +import org.apache.storm.daemon.common.AuthorizationExceptionMapper; @ApplicationPath("") public class DRPCApplication extends Application { http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/utils/ListFunctionalSupport.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/utils/ListFunctionalSupport.java b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/ListFunctionalSupport.java new file mode 100644 index 0000000..83b3588 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/ListFunctionalSupport.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.utils; + +import java.util.List; +import java.util.stream.Collectors; + +public class ListFunctionalSupport { + public static <T> T first(List<T> list) { + if (list == null || list.size() <= 0) { + return null; + } + + return list.get(0); + } + + public static <T> List<T> takeLast(List<T> list, int count) { + if (list == null) { + return null; + } + + if (list.size() <= count) { + return list; + } else { + return list.stream() + .skip(list.size() - count) + .limit(count) + .collect(Collectors.toList()); + } + } + + public static <T> List<T> drop(List<T> list, int count) { + if (list == null) { + return null; + } + + return list.stream() + .skip(count) + .collect(Collectors.toList()); + } + + public static <T> List<T> rest(List<T> list) { + return drop(list, 1); + } + + public static <T> T last(List<T> list) { + if (list == null || list.size() <= 0) { + return null; + } + + return list.get(list.size() - 1); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/utils/StreamUtil.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/utils/StreamUtil.java b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/StreamUtil.java new file mode 100644 index 0000000..24c630d --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/StreamUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.utils; + +import java.io.IOException; +import java.io.InputStream; + +public class StreamUtil { + private StreamUtil() { + } + + /** + * FileInputStream#skip may not work the first time, so ensure it successfully skips the given number of bytes. + */ + public static void skipBytes(InputStream stream, Integer n) throws IOException { + long skipped = 0; + do { + skipped = skipped + stream.skip(n - skipped); + } while (skipped < n); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/utils/URLBuilder.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/utils/URLBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/URLBuilder.java new file mode 100644 index 0000000..d30ac65 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/utils/URLBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.utils; + +import java.net.URLEncoder; +import java.util.Map; + +import static java.util.stream.Collectors.joining; + +public class URLBuilder { + private URLBuilder() { + } + + public static String build(String urlPath, Map<String, Object> parameters) { + StringBuilder sb = new StringBuilder(); + sb.append(urlPath); + if (parameters.size() > 0) { + sb.append("?"); + + String queryParam = parameters.entrySet().stream() + .map(entry -> URLEncoder.encode(entry.getKey()) + "=" + URLEncoder.encode(entry.getValue().toString())) + .collect(joining("&")); + sb.append(queryParam); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java new file mode 100644 index 0000000..01cc0bc --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer; + +public class LogviewerConstant { + public static final int DEFAULT_BYTES_PER_PAGE = 51200; +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java new file mode 100644 index 0000000..24ccf6e --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer; + +import com.codahale.metrics.Meter; +import com.google.common.annotations.VisibleForTesting; +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.drpc.webapp.ReqContextFilter; +import org.apache.storm.daemon.wip.logviewer.utils.LogCleaner; +import org.apache.storm.daemon.wip.logviewer.webapp.LogviewerApplication; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IHttpCredentialsPlugin; +import org.apache.storm.ui.FilterConfiguration; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.FilterMapping; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.resource.Resource; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES; + +public class LogviewerServer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class); + private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); + public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public"; + + private static Server mkHttpServer(Map<String, Object> conf) { + Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT); + Server ret = null; + if (logviewerHttpPort != null && logviewerHttpPort >= 0) { + LOG.info("Starting Logviewer HTTP servers..."); + Integer headerBufferSize = ObjectReader.getInt(conf.get(UI_HEADER_BUFFER_BYTES)); + String filterClass = (String) (conf.get(DaemonConfig.UI_FILTER)); + @SuppressWarnings("unchecked") + Map<String, String> filterParams = (Map<String, String>) (conf.get(DaemonConfig.UI_FILTER_PARAMS)); + FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams); + final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration); + + final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), 0); + final String httpsKsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PATH)); + final String httpsKsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PASSWORD)); + final String httpsKsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_TYPE)); + final String httpsKeyPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEY_PASSWORD)); + final String httpsTsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PATH)); + final String httpsTsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD)); + final String httpsTsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_TYPE)); + final Boolean httpsWantClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_WANT_CLIENT_AUTH)); + final Boolean httpsNeedClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_NEED_CLIENT_AUTH)); + + //TODO a better way to do this would be great. + LogviewerApplication.setup(conf); + ret = UIHelpers.jettyCreateServer(logviewerHttpPort, null, httpsPort); + + UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, + httpsTsPath, httpsTsPassword, httpsTsType, httpsNeedClientAuth, httpsWantClientAuth); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + try { + context.setBaseResource(Resource.newResource(STATIC_RESOURCE_DIRECTORY_PATH)); + } catch (IOException e) { + throw new RuntimeException("Can't locate static resource directory " + STATIC_RESOURCE_DIRECTORY_PATH); + } + + context.setContextPath("/"); + ret.setHandler(context); + + ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class); + holderPwd.setInitOrder(1); + context.addServlet(holderPwd,"/"); + + ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/api/v1/*"); + jerseyServlet.setInitOrder(2); + jerseyServlet.setInitParameter("javax.ws.rs.Application", LogviewerApplication.class.getName()); + + UIHelpers.configFilters(context, filterConfigurations); + } + return ret; + } + + private final Server httpServer; + private boolean closed = false; + + /** + * Constructor. + * @param conf Logviewer conf for the servers + */ + public LogviewerServer(Map<String, Object> conf) { + httpServer = mkHttpServer(conf); + } + + @VisibleForTesting + void start() throws Exception { + LOG.info("Starting Logviewer..."); + if (httpServer != null) { + httpServer.start(); + } + } + + @VisibleForTesting + void awaitTermination() throws InterruptedException { + httpServer.join(); + } + + @Override + public synchronized void close() { + if (!closed) { + //This is kind of useless... + meterShutdownCalls.mark(); + + //TODO this is causing issues... + //if (httpServer != null) { + // httpServer.destroy(); + //} + + closed = true; + } + } + + /** + * @return The port the HTTP server is listening on. Not available until {@link #start() } has run. + */ + public int getHttpServerPort() { + assert httpServer.getConnectors().length == 1; + + return httpServer.getConnectors()[0].getLocalPort(); + } + + /** + * Main method to start the server. + */ + public static void main(String [] args) throws Exception { + Utils.setupDefaultUncaughtExceptionHandler(); + Map<String, Object> conf = Utils.readStormConfig(); + + try (LogviewerServer server = new LogviewerServer(conf); + LogCleaner logCleaner = new LogCleaner(conf)) { + Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); + logCleaner.start(); + StormMetricsRegistry.startMetricsReporters(conf); + server.start(); + server.awaitTermination(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java new file mode 100644 index 0000000..26d1d63 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.handler; + +import org.apache.storm.daemon.wip.logviewer.utils.LogFileDownloader; +import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; + +import javax.ws.rs.core.Response; +import java.io.IOException; + +public class LogviewerLogDownloadHandler { + + private final LogFileDownloader logFileDownloadHelper; + + public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { + this.logFileDownloadHelper = new LogFileDownloader(logRoot, daemonLogRoot, resourceAuthorizer); + } + + public Response downloadLogFile(String fileName, String user) throws IOException { + return logFileDownloadHelper.downloadFile(fileName, user, false); + } + + public Response downloadDaemonLogFile(String fileName, String user) throws IOException { + return logFileDownloadHelper.downloadFile(fileName, user, true); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java new file mode 100644 index 0000000..0e623f5 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.handler; + +import j2html.TagCreator; +import j2html.tags.DomContent; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.daemon.DirectoryCleaner; +import org.apache.storm.daemon.utils.StreamUtil; +import org.apache.storm.daemon.utils.URLBuilder; +import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; +import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.daemon.wip.logviewer.utils.WorkerLogs; +import org.apache.storm.ui.InvalidRequestException; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ServerUtils; +import org.apache.storm.utils.Utils; +import org.jooq.lambda.Unchecked; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +import javax.ws.rs.core.Response; + +import static j2html.TagCreator.*; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toList; +import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; +import static org.apache.storm.daemon.wip.logviewer.LogviewerConstant.DEFAULT_BYTES_PER_PAGE; + +public class LogviewerLogPageHandler { + private final String logRoot; + private final String daemonLogRoot; + private final ResourceAuthorizer resourceAuthorizer; + + public LogviewerLogPageHandler(String logRoot, String daemonLogRoot, + ResourceAuthorizer resourceAuthorizer) { + this.logRoot = logRoot; + this.daemonLogRoot = daemonLogRoot; + this.resourceAuthorizer = resourceAuthorizer; + } + + public Response listLogFiles(String user, Integer port, String topologyId, String callback, String origin) throws IOException { + List<File> fileResults = null; + if (topologyId == null) { + if (port == null) { + fileResults = WorkerLogs.getAllLogsForRootDir(new File(logRoot)); + } else { + fileResults = new ArrayList<>(); + + File[] logRootFiles = new File(logRoot).listFiles(); + if (logRootFiles != null) { + for (File topoDir : logRootFiles) { + File[] topoDirFiles = topoDir.listFiles(); + if (topoDirFiles != null) { + for (File portDir : topoDirFiles) { + if (portDir.getName().equals(port.toString())) { + fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); + } + } + } + } + } + } + } else { + if (port == null) { + fileResults = new ArrayList<>(); + + File topoDir = new File(logRoot + Utils.FILE_PATH_SEPARATOR + topologyId); + if (topoDir.exists()) { + File[] topoDirFiles = topoDir.listFiles(); + if (topoDirFiles != null) { + for (File portDir : topoDirFiles) { + fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); + } + } + } + + } else { + File portDir = ConfigUtils.getWorkerDirFromRoot(logRoot, topologyId, port); + if (portDir.exists()) { + fileResults = DirectoryCleaner.getFilesForDir(portDir); + } + } + } + + List<String> files; + if (fileResults != null) { + files = fileResults.stream() + .map(file -> WorkerLogs.getTopologyPortWorkerLog(file)) + .sorted().collect(toList()); + } else { + files = new ArrayList<>(); + } + + return LogviewerResponseBuilder.buildSuccessJsonResponse(files, callback, origin); + } + + public Response logPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { + String rootDir = logRoot; + if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { + File file = new File(rootDir, fileName).getCanonicalFile(); + String path = file.getCanonicalPath(); + boolean isZipFile = path.endsWith(".gz"); + File topoDir = file.getParentFile().getParentFile(); + + if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) { + long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + + SortedSet<File> logFiles; + try { + logFiles = Arrays.stream(topoDir.listFiles()) + .flatMap(Unchecked.function(portDir -> DirectoryCleaner.getFilesForDir(portDir).stream())) + .filter(File::isFile) + .collect(toCollection(TreeSet::new)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + + List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) + .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); + + List<String> reorderedFilesStr = new ArrayList<>(); + reorderedFilesStr.addAll(filesStrWithoutFileParam); + reorderedFilesStr.add(fileName); + + length = length != null ? Math.min(10485760, length) : DEFAULT_BYTES_PER_PAGE; + + String logString; + if (isTxtFile(fileName)) { + logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); + } else { + logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + } + + start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + + List<DomContent> bodyContents = new ArrayList<>(); + if (StringUtils.isNotEmpty(grep)) { + String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) + .filter(str -> str.contains(grep)).collect(toList())); + bodyContents.add(pre(matchedString).withId("logContent")); + } else { + DomContent pagerData = null; + if (isTxtFile(fileName)) { + pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "log"); + } + + bodyContents.add(searchFileForm(fileName, "no")); + // list all files for this topology + bodyContents.add(logFileSelectionForm(reorderedFilesStr, "log")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + bodyContents.add(downloadLink(fileName)); + bodyContents.add(pre(logString).withClass("logContent")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + } + + String content = logTemplate(bodyContents, fileName, user).render(); + return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } else { + if (resourceAuthorizer.getLogUserGroupWhitelist(fileName) != null) { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } else { + return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); + } + } + } + + public Response daemonLogPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { + String rootDir = daemonLogRoot; + File file = new File(rootDir, fileName).getCanonicalFile(); + String path = file.getCanonicalPath(); + boolean isZipFile = path.endsWith(".gz"); + + if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { + long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + + // all types of files included + List<File> logFiles = Arrays.stream(new File(rootDir).listFiles()) + .filter(File::isFile) + .collect(toList()); + + List<String> filesStrWithoutFileParam = logFiles.stream() + .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); + + List<String> reorderedFilesStr = new ArrayList<>(); + reorderedFilesStr.addAll(filesStrWithoutFileParam); + reorderedFilesStr.add(fileName); + + length = length != null ? Math.min(10485760, length) : DEFAULT_BYTES_PER_PAGE; + + String logString; + if (isTxtFile(fileName)) { + logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); + } else { + logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + } + + start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + + List<DomContent> bodyContents = new ArrayList<>(); + if (StringUtils.isNotEmpty(grep)) { + String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) + .filter(str -> str.contains(grep)).collect(toList())); + bodyContents.add(pre(matchedString).withId("logContent")); + } else { + DomContent pagerData = null; + if (isTxtFile(fileName)) { + pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "daemonlog"); + } + + bodyContents.add(searchFileForm(fileName, "yes")); + // list all daemon logs + bodyContents.add(logFileSelectionForm(reorderedFilesStr, "daemonlog")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + bodyContents.add(daemonDownloadLink(fileName)); + bodyContents.add(pre(logString).withClass("logContent")); + if (pagerData != null) { + bodyContents.add(pagerData); + } + } + + String content = logTemplate(bodyContents, fileName, user).render(); + return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } + + private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) { + List<DomContent> finalBodyContents = new ArrayList<>(); + + if (StringUtils.isNotBlank(user)) { + finalBodyContents.add(div(p("User: " + user)).withClass("ui-user")); + } + + finalBodyContents.add(div(p("Note: the drop-list shows at most 1024 files for each worker directory.")).withClass("ui-note")); + finalBodyContents.add(h3(escapeHtml(fileName))); + finalBodyContents.addAll(bodyContents); + + return html( + head( + title(escapeHtml(fileName) + " - Storm Log Viewer"), + link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"), + link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"), + link().withRel("stylesheet").withHref("/css/style.css") + ), + body( + finalBodyContents.toArray(new DomContent[]{}) + ) + ); + } + + private DomContent downloadLink(String fileName) { + return p(linkTo(UIHelpers.urlFormat("/api/v1/download?file=%s", fileName), "Download Full File")); + } + + private DomContent daemonDownloadLink(String fileName) { + return p(linkTo(UIHelpers.urlFormat("/api/v1/daemondownload?file=%s", fileName), "Download Full File")); + } + + private DomContent linkTo(String url, String content) { + return a(content).withHref(url); + } + + private DomContent logFileSelectionForm(List<String> logFiles, String type) { + return form( + dropDown("file", logFiles), + input().withType("submit").withValue("Switch file") + ).withAction(type).withId("list-of-files"); + } + + private DomContent dropDown(String name, List<String> logFiles) { + List<DomContent> options = logFiles.stream().map(TagCreator::option).collect(toList()); + return select(options.toArray(new DomContent[]{})).withName(name).withId(name); + } + + private DomContent searchFileForm(String fileName, String isDaemonValue) { + return form( + text("search this file:"), + input().withType("text").withName("search"), + input().withType("hidden").withName("is-daemon").withValue(isDaemonValue), + input().withType("hidden").withName("file").withValue(fileName), + input().withType("submit").withValue("Search") + ).withAction("/logviewer_search.html").withId("search-box"); + } + + private DomContent pagerLinks(String fileName, Integer start, Integer length, Integer fileLength, String type) { + int prevStart = Math.max(0, start - length); + int nextStart = fileLength > 0 ? Math.min(Math.max(0, fileLength - length), start + length) : start + length; + List<DomContent> btnLinks = new ArrayList<>(); + + Map<String, Object> urlQueryParams = new HashMap<>(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", Math.max(0, start - length)); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Prev", prevStart < start)); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", 0); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "First")); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Last")); + + urlQueryParams.clear(); + urlQueryParams.put("file", fileName); + urlQueryParams.put("start", Math.min(Math.max(0, fileLength - length), start + length)); + urlQueryParams.put("length", length); + + btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Next", nextStart > start)); + + return div(btnLinks.toArray(new DomContent[]{})); + } + + private DomContent toButtonLink(String url, String text) { + return toButtonLink(url, text, true); + } + + private DomContent toButtonLink(String url, String text, boolean enabled) { + return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled")); + } + + private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException { + boolean isZipFile = path.endsWith(".gz"); + long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); + long skip = fileLength - tail; + return pageFile(path, Long.valueOf(skip).intValue(), tail); + } + + private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException { + boolean isZipFile = path.endsWith(".gz"); + long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); + + try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path); + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + if (start >= fileLength) { + throw new InvalidRequestException("Cannot start past the end of the file"); + } + if (start > 0) { + StreamUtil.skipBytes(input, start); + } + + byte[] buffer = new byte[1024]; + while (output.size() < length) { + int size = input.read(buffer, 0, Math.min(1024, length - output.size())); + if (size > 0) { + output.write(buffer, 0, size); + } else { + break; + } + } + + return output.toString(); + } + } + + private boolean isTxtFile(String fileName) { + Pattern p = Pattern.compile("\\.(log.*|txt|yaml|pid)$"); + Matcher matcher = p.matcher(fileName); + return matcher.find(); + } +}
