http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java new file mode 100644 index 0000000..bc6529c --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.webapp; + +import com.codahale.metrics.Meter; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.daemon.common.JsonResponseBuilder; +import org.apache.storm.daemon.logviewer.handler.LogviewerLogSearchHandler; +import org.apache.storm.daemon.logviewer.handler.LogviewerProfileHandler; +import org.apache.storm.daemon.logviewer.handler.LogviewerLogDownloadHandler; +import org.apache.storm.daemon.logviewer.handler.LogviewerLogPageHandler; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.security.auth.IHttpCredentialsPlugin; +import org.apache.storm.ui.InvalidRequestException; +import org.apache.storm.ui.UIHelpers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.Map; + +@Path("/") +public class LogviewerResource { + private static final Logger LOG = LoggerFactory.getLogger(LogviewerResource.class); + + private static final Meter meterLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-log-page-http-requests"); + private static final Meter meterDaemonLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-daemonlog-page-http-requests"); + private static final Meter meterDownloadLogFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-file-http-requests"); + private static final Meter meterDownloadLogDaemonFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-daemon-file-http-requests"); + private static final Meter meterListLogsHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests"); + + private final LogviewerLogPageHandler logviewer; + private final LogviewerProfileHandler profileHandler; + private final LogviewerLogDownloadHandler logDownloadHandler; + private final LogviewerLogSearchHandler logSearchHandler; + private final IHttpCredentialsPlugin httpCredsHandler; + + public LogviewerResource(LogviewerLogPageHandler logviewerParam, LogviewerProfileHandler profileHandler, + LogviewerLogDownloadHandler logDownloadHandler, LogviewerLogSearchHandler logSearchHandler, + IHttpCredentialsPlugin httpCredsHandler) { + this.logviewer = logviewerParam; + this.profileHandler = profileHandler; + this.logDownloadHandler = logDownloadHandler; + this.logSearchHandler = logSearchHandler; + this.httpCredsHandler = httpCredsHandler; + } + + @GET + @Path("/log") + public Response log(@Context HttpServletRequest request) throws IOException { + meterLogPageHttpRequests.mark(); + + try { + String user = httpCredsHandler.getUserName(request); + Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; + Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; + String decodedFileName = URLDecoder.decode(request.getParameter("file")); + String grep = request.getParameter("grep"); + return logviewer.logPage(decodedFileName, start, length, grep, user); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return Response.status(400).entity(e.getMessage()).build(); + } + } + + @GET + @Path("/daemonlog") + public Response daemonLog(@Context HttpServletRequest request) throws IOException { + meterDaemonLogPageHttpRequests.mark(); + + try { + String user = httpCredsHandler.getUserName(request); + Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; + Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; + String decodedFileName = URLDecoder.decode(request.getParameter("file")); + String grep = request.getParameter("grep"); + return logviewer.daemonLogPage(decodedFileName, start, length, grep, user); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return Response.status(400).entity(e.getMessage()).build(); + } + } + + @GET + @Path("/searchLogs") + public Response searchLogs(@Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + String topologyId = request.getParameter("topoId"); + String portStr = request.getParameter("port"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + } + + @GET + @Path("/listLogs") + public Response listLogs(@Context HttpServletRequest request) throws IOException { + meterListLogsHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String topologyId = request.getParameter("topoId"); + String portStr = request.getParameter("port"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + } + + @GET + @Path("/dumps/{topo-id}/{host-port}") + public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, + @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + return profileHandler.listDumpFiles(topologyId, hostPort, user); + } + + @GET + @Path("/dumps/{topo-id}/{host-port}/{filename}") + public Response downloadDumpFile(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, + @PathParam("filename") String fileName, @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user); + } + + @GET + @Path("/download") + public Response downloadLogFile(@Context HttpServletRequest request) throws IOException { + meterDownloadLogFileHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String file = request.getParameter("file"); + String decodedFileName = URLDecoder.decode(file); + return logDownloadHandler.downloadLogFile(decodedFileName, user); + } + + @GET + @Path("/daemondownload") + public Response downloadDaemonLogFile(@Context HttpServletRequest request) throws IOException { + meterDownloadLogDaemonFileHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String file = request.getParameter("file"); + String decodedFileName = URLDecoder.decode(file); + return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user); + } + + @GET + @Path("/search/{file}") + public Response search(@PathParam("file") String file, @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + boolean isDaemon = StringUtils.equals(request.getParameter("is-daemon"), "yes"); + String decodedFileName = URLDecoder.decode(file); + String searchString = request.getParameter("search-string"); + String numMatchesStr = request.getParameter("num-matches"); + String startByteOffset = request.getParameter("start-byte-offset"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + try { + return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, searchString, numMatchesStr, + startByteOffset, callback, origin); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e)).setCallback(callback) + .setStatus(400).build(); + } + } + + @GET + @Path("/deepSearch/{topoId}") + public Response deepSearch(@PathParam("topoId") String topologyId, + @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + String searchString = request.getParameter("search-string"); + String numMatchesStr = request.getParameter("num-matches"); + String portStr = request.getParameter("port"); + String startFileOffset = request.getParameter("start-file-offset"); + String startByteOffset = request.getParameter("start-byte-offset"); + String searchArchived = request.getParameter("search-archived"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, + startFileOffset, startByteOffset, BooleanUtils.toBooleanObject(searchArchived), callback, origin); + } + + private int parseIntegerFromMap(Map map, String parameterKey) throws InvalidRequestException { + try { + return Integer.parseInt(((String[]) map.get(parameterKey))[0]); + } catch (NumberFormatException ex) { + throw new InvalidRequestException("Could not make an integer out of the query parameter '" + + parameterKey + "'", ex); + } + } + +}
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java deleted file mode 100644 index 01cc0bc..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerConstant.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer; - -public class LogviewerConstant { - public static final int DEFAULT_BYTES_PER_PAGE = 51200; -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java deleted file mode 100644 index 24ccf6e..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/LogviewerServer.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer; - -import com.codahale.metrics.Meter; -import com.google.common.annotations.VisibleForTesting; -import org.apache.storm.DaemonConfig; -import org.apache.storm.daemon.drpc.webapp.ReqContextFilter; -import org.apache.storm.daemon.wip.logviewer.utils.LogCleaner; -import org.apache.storm.daemon.wip.logviewer.webapp.LogviewerApplication; -import org.apache.storm.metric.StormMetricsRegistry; -import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.IHttpCredentialsPlugin; -import org.apache.storm.ui.FilterConfiguration; -import org.apache.storm.ui.UIHelpers; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.Utils; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.servlet.FilterMapping; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.util.resource.Resource; -import org.glassfish.jersey.servlet.ServletContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES; - -public class LogviewerServer implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class); - private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); - public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public"; - - private static Server mkHttpServer(Map<String, Object> conf) { - Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT); - Server ret = null; - if (logviewerHttpPort != null && logviewerHttpPort >= 0) { - LOG.info("Starting Logviewer HTTP servers..."); - Integer headerBufferSize = ObjectReader.getInt(conf.get(UI_HEADER_BUFFER_BYTES)); - String filterClass = (String) (conf.get(DaemonConfig.UI_FILTER)); - @SuppressWarnings("unchecked") - Map<String, String> filterParams = (Map<String, String>) (conf.get(DaemonConfig.UI_FILTER_PARAMS)); - FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams); - final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration); - - final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), 0); - final String httpsKsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PATH)); - final String httpsKsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PASSWORD)); - final String httpsKsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_TYPE)); - final String httpsKeyPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEY_PASSWORD)); - final String httpsTsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PATH)); - final String httpsTsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD)); - final String httpsTsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_TYPE)); - final Boolean httpsWantClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_WANT_CLIENT_AUTH)); - final Boolean httpsNeedClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_NEED_CLIENT_AUTH)); - - //TODO a better way to do this would be great. - LogviewerApplication.setup(conf); - ret = UIHelpers.jettyCreateServer(logviewerHttpPort, null, httpsPort); - - UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, - httpsTsPath, httpsTsPassword, httpsTsType, httpsNeedClientAuth, httpsWantClientAuth); - - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - try { - context.setBaseResource(Resource.newResource(STATIC_RESOURCE_DIRECTORY_PATH)); - } catch (IOException e) { - throw new RuntimeException("Can't locate static resource directory " + STATIC_RESOURCE_DIRECTORY_PATH); - } - - context.setContextPath("/"); - ret.setHandler(context); - - ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class); - holderPwd.setInitOrder(1); - context.addServlet(holderPwd,"/"); - - ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/api/v1/*"); - jerseyServlet.setInitOrder(2); - jerseyServlet.setInitParameter("javax.ws.rs.Application", LogviewerApplication.class.getName()); - - UIHelpers.configFilters(context, filterConfigurations); - } - return ret; - } - - private final Server httpServer; - private boolean closed = false; - - /** - * Constructor. - * @param conf Logviewer conf for the servers - */ - public LogviewerServer(Map<String, Object> conf) { - httpServer = mkHttpServer(conf); - } - - @VisibleForTesting - void start() throws Exception { - LOG.info("Starting Logviewer..."); - if (httpServer != null) { - httpServer.start(); - } - } - - @VisibleForTesting - void awaitTermination() throws InterruptedException { - httpServer.join(); - } - - @Override - public synchronized void close() { - if (!closed) { - //This is kind of useless... - meterShutdownCalls.mark(); - - //TODO this is causing issues... - //if (httpServer != null) { - // httpServer.destroy(); - //} - - closed = true; - } - } - - /** - * @return The port the HTTP server is listening on. Not available until {@link #start() } has run. - */ - public int getHttpServerPort() { - assert httpServer.getConnectors().length == 1; - - return httpServer.getConnectors()[0].getLocalPort(); - } - - /** - * Main method to start the server. - */ - public static void main(String [] args) throws Exception { - Utils.setupDefaultUncaughtExceptionHandler(); - Map<String, Object> conf = Utils.readStormConfig(); - - try (LogviewerServer server = new LogviewerServer(conf); - LogCleaner logCleaner = new LogCleaner(conf)) { - Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); - logCleaner.start(); - StormMetricsRegistry.startMetricsReporters(conf); - server.start(); - server.awaitTermination(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java deleted file mode 100644 index 26d1d63..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogDownloadHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.handler; - -import org.apache.storm.daemon.wip.logviewer.utils.LogFileDownloader; -import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; - -import javax.ws.rs.core.Response; -import java.io.IOException; - -public class LogviewerLogDownloadHandler { - - private final LogFileDownloader logFileDownloadHelper; - - public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { - this.logFileDownloadHelper = new LogFileDownloader(logRoot, daemonLogRoot, resourceAuthorizer); - } - - public Response downloadLogFile(String fileName, String user) throws IOException { - return logFileDownloadHelper.downloadFile(fileName, user, false); - } - - public Response downloadDaemonLogFile(String fileName, String user) throws IOException { - return logFileDownloadHelper.downloadFile(fileName, user, true); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java deleted file mode 100644 index 0e623f5..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogPageHandler.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.handler; - -import j2html.TagCreator; -import j2html.tags.DomContent; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.daemon.DirectoryCleaner; -import org.apache.storm.daemon.utils.StreamUtil; -import org.apache.storm.daemon.utils.URLBuilder; -import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; -import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; -import org.apache.storm.daemon.wip.logviewer.utils.WorkerLogs; -import org.apache.storm.ui.InvalidRequestException; -import org.apache.storm.ui.UIHelpers; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ServerUtils; -import org.apache.storm.utils.Utils; -import org.jooq.lambda.Unchecked; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -import javax.ws.rs.core.Response; - -import static j2html.TagCreator.*; -import static java.util.stream.Collectors.toCollection; -import static java.util.stream.Collectors.toList; -import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; -import static org.apache.storm.daemon.wip.logviewer.LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -public class LogviewerLogPageHandler { - private final String logRoot; - private final String daemonLogRoot; - private final ResourceAuthorizer resourceAuthorizer; - - public LogviewerLogPageHandler(String logRoot, String daemonLogRoot, - ResourceAuthorizer resourceAuthorizer) { - this.logRoot = logRoot; - this.daemonLogRoot = daemonLogRoot; - this.resourceAuthorizer = resourceAuthorizer; - } - - public Response listLogFiles(String user, Integer port, String topologyId, String callback, String origin) throws IOException { - List<File> fileResults = null; - if (topologyId == null) { - if (port == null) { - fileResults = WorkerLogs.getAllLogsForRootDir(new File(logRoot)); - } else { - fileResults = new ArrayList<>(); - - File[] logRootFiles = new File(logRoot).listFiles(); - if (logRootFiles != null) { - for (File topoDir : logRootFiles) { - File[] topoDirFiles = topoDir.listFiles(); - if (topoDirFiles != null) { - for (File portDir : topoDirFiles) { - if (portDir.getName().equals(port.toString())) { - fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); - } - } - } - } - } - } - } else { - if (port == null) { - fileResults = new ArrayList<>(); - - File topoDir = new File(logRoot + Utils.FILE_PATH_SEPARATOR + topologyId); - if (topoDir.exists()) { - File[] topoDirFiles = topoDir.listFiles(); - if (topoDirFiles != null) { - for (File portDir : topoDirFiles) { - fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir)); - } - } - } - - } else { - File portDir = ConfigUtils.getWorkerDirFromRoot(logRoot, topologyId, port); - if (portDir.exists()) { - fileResults = DirectoryCleaner.getFilesForDir(portDir); - } - } - } - - List<String> files; - if (fileResults != null) { - files = fileResults.stream() - .map(file -> WorkerLogs.getTopologyPortWorkerLog(file)) - .sorted().collect(toList()); - } else { - files = new ArrayList<>(); - } - - return LogviewerResponseBuilder.buildSuccessJsonResponse(files, callback, origin); - } - - public Response logPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { - String rootDir = logRoot; - if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { - File file = new File(rootDir, fileName).getCanonicalFile(); - String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); - File topoDir = file.getParentFile().getParentFile(); - - if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) { - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - - SortedSet<File> logFiles; - try { - logFiles = Arrays.stream(topoDir.listFiles()) - .flatMap(Unchecked.function(portDir -> DirectoryCleaner.getFilesForDir(portDir).stream())) - .filter(File::isFile) - .collect(toCollection(TreeSet::new)); - } catch (UncheckedIOException e) { - throw e.getCause(); - } - - List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) - .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); - - List<String> reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); - reorderedFilesStr.add(fileName); - - length = length != null ? Math.min(10485760, length) : DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); - } - - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); - - List<DomContent> bodyContents = new ArrayList<>(); - if (StringUtils.isNotEmpty(grep)) { - String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) - .filter(str -> str.contains(grep)).collect(toList())); - bodyContents.add(pre(matchedString).withId("logContent")); - } else { - DomContent pagerData = null; - if (isTxtFile(fileName)) { - pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "log"); - } - - bodyContents.add(searchFileForm(fileName, "no")); - // list all files for this topology - bodyContents.add(logFileSelectionForm(reorderedFilesStr, "log")); - if (pagerData != null) { - bodyContents.add(pagerData); - } - bodyContents.add(downloadLink(fileName)); - bodyContents.add(pre(logString).withClass("logContent")); - if (pagerData != null) { - bodyContents.add(pagerData); - } - } - - String content = logTemplate(bodyContents, fileName, user).render(); - return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); - } else { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } - } else { - if (resourceAuthorizer.getLogUserGroupWhitelist(fileName) != null) { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } else { - return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); - } - } - } - - public Response daemonLogPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { - String rootDir = daemonLogRoot; - File file = new File(rootDir, fileName).getCanonicalFile(); - String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); - - if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - - // all types of files included - List<File> logFiles = Arrays.stream(new File(rootDir).listFiles()) - .filter(File::isFile) - .collect(toList()); - - List<String> filesStrWithoutFileParam = logFiles.stream() - .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); - - List<String> reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); - reorderedFilesStr.add(fileName); - - length = length != null ? Math.min(10485760, length) : DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); - } - - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); - - List<DomContent> bodyContents = new ArrayList<>(); - if (StringUtils.isNotEmpty(grep)) { - String matchedString = String.join("\n", Arrays.stream(logString.split("\n")) - .filter(str -> str.contains(grep)).collect(toList())); - bodyContents.add(pre(matchedString).withId("logContent")); - } else { - DomContent pagerData = null; - if (isTxtFile(fileName)) { - pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "daemonlog"); - } - - bodyContents.add(searchFileForm(fileName, "yes")); - // list all daemon logs - bodyContents.add(logFileSelectionForm(reorderedFilesStr, "daemonlog")); - if (pagerData != null) { - bodyContents.add(pagerData); - } - bodyContents.add(daemonDownloadLink(fileName)); - bodyContents.add(pre(logString).withClass("logContent")); - if (pagerData != null) { - bodyContents.add(pagerData); - } - } - - String content = logTemplate(bodyContents, fileName, user).render(); - return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); - } else { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } - } - - private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) { - List<DomContent> finalBodyContents = new ArrayList<>(); - - if (StringUtils.isNotBlank(user)) { - finalBodyContents.add(div(p("User: " + user)).withClass("ui-user")); - } - - finalBodyContents.add(div(p("Note: the drop-list shows at most 1024 files for each worker directory.")).withClass("ui-note")); - finalBodyContents.add(h3(escapeHtml(fileName))); - finalBodyContents.addAll(bodyContents); - - return html( - head( - title(escapeHtml(fileName) + " - Storm Log Viewer"), - link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"), - link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"), - link().withRel("stylesheet").withHref("/css/style.css") - ), - body( - finalBodyContents.toArray(new DomContent[]{}) - ) - ); - } - - private DomContent downloadLink(String fileName) { - return p(linkTo(UIHelpers.urlFormat("/api/v1/download?file=%s", fileName), "Download Full File")); - } - - private DomContent daemonDownloadLink(String fileName) { - return p(linkTo(UIHelpers.urlFormat("/api/v1/daemondownload?file=%s", fileName), "Download Full File")); - } - - private DomContent linkTo(String url, String content) { - return a(content).withHref(url); - } - - private DomContent logFileSelectionForm(List<String> logFiles, String type) { - return form( - dropDown("file", logFiles), - input().withType("submit").withValue("Switch file") - ).withAction(type).withId("list-of-files"); - } - - private DomContent dropDown(String name, List<String> logFiles) { - List<DomContent> options = logFiles.stream().map(TagCreator::option).collect(toList()); - return select(options.toArray(new DomContent[]{})).withName(name).withId(name); - } - - private DomContent searchFileForm(String fileName, String isDaemonValue) { - return form( - text("search this file:"), - input().withType("text").withName("search"), - input().withType("hidden").withName("is-daemon").withValue(isDaemonValue), - input().withType("hidden").withName("file").withValue(fileName), - input().withType("submit").withValue("Search") - ).withAction("/logviewer_search.html").withId("search-box"); - } - - private DomContent pagerLinks(String fileName, Integer start, Integer length, Integer fileLength, String type) { - int prevStart = Math.max(0, start - length); - int nextStart = fileLength > 0 ? Math.min(Math.max(0, fileLength - length), start + length) : start + length; - List<DomContent> btnLinks = new ArrayList<>(); - - Map<String, Object> urlQueryParams = new HashMap<>(); - urlQueryParams.put("file", fileName); - urlQueryParams.put("start", Math.max(0, start - length)); - urlQueryParams.put("length", length); - - btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Prev", prevStart < start)); - - urlQueryParams.clear(); - urlQueryParams.put("file", fileName); - urlQueryParams.put("start", 0); - urlQueryParams.put("length", length); - - btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "First")); - - urlQueryParams.clear(); - urlQueryParams.put("file", fileName); - urlQueryParams.put("length", length); - - btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Last")); - - urlQueryParams.clear(); - urlQueryParams.put("file", fileName); - urlQueryParams.put("start", Math.min(Math.max(0, fileLength - length), start + length)); - urlQueryParams.put("length", length); - - btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Next", nextStart > start)); - - return div(btnLinks.toArray(new DomContent[]{})); - } - - private DomContent toButtonLink(String url, String text) { - return toButtonLink(url, text, true); - } - - private DomContent toButtonLink(String url, String text, boolean enabled) { - return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled")); - } - - private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - long skip = fileLength - tail; - return pageFile(path, Long.valueOf(skip).intValue(), tail); - } - - private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - - try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path); - ByteArrayOutputStream output = new ByteArrayOutputStream()) { - if (start >= fileLength) { - throw new InvalidRequestException("Cannot start past the end of the file"); - } - if (start > 0) { - StreamUtil.skipBytes(input, start); - } - - byte[] buffer = new byte[1024]; - while (output.size() < length) { - int size = input.read(buffer, 0, Math.min(1024, length - output.size())); - if (size > 0) { - output.write(buffer, 0, size); - } else { - break; - } - } - - return output.toString(); - } - } - - private boolean isTxtFile(String fileName) { - Pattern p = Pattern.compile("\\.(log.*|txt|yaml|pid)$"); - Matcher matcher = p.matcher(fileName); - return matcher.find(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java deleted file mode 100644 index c0a69cb..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java +++ /dev/null @@ -1,686 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.handler; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.DaemonConfig; -import org.apache.storm.daemon.DirectoryCleaner; -import org.apache.storm.daemon.common.JsonResponseBuilder; -import org.apache.storm.daemon.utils.StreamUtil; -import org.apache.storm.daemon.utils.URLBuilder; -import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; -import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; -import org.apache.storm.daemon.wip.logviewer.utils.WorkerLogs; -import org.apache.storm.ui.InvalidRequestException; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.ServerUtils; -import org.apache.storm.utils.Utils; -import org.json.simple.JSONAware; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.Response; -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -import static java.util.stream.Collectors.toList; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.drop; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.first; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.last; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; -import static org.apache.storm.daemon.wip.logviewer.LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -public class LogviewerLogSearchHandler { - private final static Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); - - public static final int GREP_MAX_SEARCH_SIZE = 1024; - public static final int GREP_BUF_SIZE = 2048; - public static final int GREP_CONTEXT_SIZE = 128; - public static final Pattern WORKER_LOG_FILENAME_PATTERN = Pattern.compile("^worker.log(.*)"); - - private final Map<String, Object> stormConf; - private final String logRoot; - private final String daemonLogRoot; - private final ResourceAuthorizer resourceAuthorizer; - private final Integer logviewerPort; - - public LogviewerLogSearchHandler(Map<String, Object> stormConf, String logRoot, String daemonLogRoot, - ResourceAuthorizer resourceAuthorizer) { - this.stormConf = stormConf; - this.logRoot = logRoot; - this.daemonLogRoot = daemonLogRoot; - this.resourceAuthorizer = resourceAuthorizer; - - this.logviewerPort = ObjectReader.getInt(stormConf.get(DaemonConfig.LOGVIEWER_PORT)); - } - - public Response searchLogFile(String fileName, String user, boolean isDaemon, String search, - String numMatchesStr, String offsetStr, String callback, String origin) - throws IOException, InvalidRequestException { - String rootDir = isDaemon ? daemonLogRoot : logRoot; - File file = new File(rootDir, fileName).getCanonicalFile(); - Response response; - if (file.exists()) { - if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) { - Integer numMatchesInt = numMatchesStr != null ? tryParseIntParam("num-matches", numMatchesStr) : null; - Integer offsetInt = offsetStr != null ? tryParseIntParam("start-byte-offset", offsetStr) : null; - - try { - if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) { - Map<String, Object> entity = new HashMap<>(); - entity.put("isDaemon", isDaemon ? "yes" : "no"); - entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt)); - - response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin); - } else { - throw new InvalidRequestException("Search substring must be between 1 and 1024 " - + "UTF-8 bytes in size (inclusive)"); - } - } catch (Exception ex) { - response = LogviewerResponseBuilder.buildExceptionJsonResponse(ex, callback); - } - } else { - // unauthorized - response = LogviewerResponseBuilder.buildUnauthorizedUserJsonResponse(user, callback); - } - } else { - // not found - Map<String, String> entity = new HashMap<>(); - entity.put("error", "Not Found"); - entity.put("errorMessage", "The file was not found on this node."); - - response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build(); - } - - return response; - } - - public Response deepSearchLogsForTopology(String topologyId, String user, String search, - String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr, - Boolean searchArchived, String callback, String origin) { - String rootDir = logRoot; - Object returnValue; - File topologyDir = new File(rootDir + Utils.FILE_PATH_SEPARATOR + topologyId); - if (StringUtils.isEmpty(search) || !topologyDir.exists()) { - returnValue = new ArrayList<>(); - } else { - int fileOffset = ObjectReader.getInt(fileOffsetStr, 0); - int offset = ObjectReader.getInt(offsetStr, 0); - int numMatches = ObjectReader.getInt(numMatchesStr, 1); - - File[] portDirsArray = topologyDir.listFiles(); - List<File> portDirs; - if (portDirsArray != null) { - portDirs = Arrays.asList(portDirsArray); - } else { - portDirs = new ArrayList<>(); - } - - if (StringUtils.isEmpty(portStr) || portStr.equals("*")) { - // check for all ports - List<List<File>> filteredLogs = portDirs.stream() - .map(portDir -> logsForPort(user, portDir)) - .filter(logs -> logs != null && !logs.isEmpty()) - .collect(toList()); - - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = filteredLogs.stream() - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); - } else { - returnValue = filteredLogs.stream() - .map(fl -> Collections.singletonList(first(fl))) - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); - } - } else { - int port = Integer.parseInt(portStr); - // check just the one port - List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS, - new ArrayList<>()); - boolean containsPort = slotsPorts.stream() - .anyMatch(slotPort -> slotPort != null && (slotPort == port)); - if (!containsPort) { - returnValue = new ArrayList<>(); - } else { - File portDir = new File(rootDir + Utils.FILE_PATH_SEPARATOR + topologyId + - Utils.FILE_PATH_SEPARATOR + port); - - if (!portDir.exists() || logsForPort(user, portDir).isEmpty()) { - returnValue = new ArrayList<>(); - } else { - List<File> filteredLogs = logsForPort(user, portDir); - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); - } else { - returnValue = findNMatches(Collections.singletonList(first(filteredLogs)), - numMatches, 0, offset, search); - } - } - } - } - } - - return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin); - } - - private Integer tryParseIntParam(String paramName, String value) throws InvalidRequestException { - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - throw new InvalidRequestException("Could not parse " + paramName + " to an integer"); - } - } - - private Map<String,Object> substringSearch(File file, String searchString, int numMatches, int startByteOffset) throws InvalidRequestException { - return substringSearch(file, searchString, false, numMatches, startByteOffset); - } - - private Map<String,Object> substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches, Integer startByteOffset) throws InvalidRequestException { - try { - if (StringUtils.isEmpty(searchString)) { - throw new IllegalArgumentException("Precondition fails: search string should not be empty."); - } - if (searchString.getBytes("UTF-8").length > GREP_MAX_SEARCH_SIZE) { - throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " + GREP_MAX_SEARCH_SIZE); - } - - boolean isZipFile = file.getName().endsWith(".gz"); - FileInputStream fis = new FileInputStream(file); - InputStream gzippedInputStream; - if (isZipFile) { - gzippedInputStream = new GZIPInputStream(fis); - } else { - gzippedInputStream = fis; - } - - BufferedInputStream stream = new BufferedInputStream(gzippedInputStream); - - int fileLength; - if (isZipFile) { - fileLength = (int) ServerUtils.zipFileSize(file); - } else { - fileLength = (int) file.length(); - } - - ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE); - byte[] bufArray = buf.array(); - int totalBytesRead = 0; - byte[] searchBytes = searchString.getBytes("UTF-8"); - numMatches = numMatches != null ? numMatches : 10; - startByteOffset = startByteOffset != null ? startByteOffset : 0; - - // Start at the part of the log file we are interested in. - // Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files - if (startByteOffset > fileLength) { - throw new InvalidRequestException("Cannot search past the end of the file"); - } - - if (startByteOffset > 0) { - StreamUtil.skipBytes(stream, startByteOffset); - } - - Arrays.fill(bufArray, (byte) 0); - - int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE)); - buf.limit(bytesRead); - totalBytesRead += bytesRead; - - List<Map<String, Object>> initialMatches = new ArrayList<>(); - int initBufOffset = 0; - int byteOffset = startByteOffset; - byte[] beforeBytes = null; - - Map<String, Object> ret = new HashMap<>(); - while (true) { - SubstringSearchResult searchRet = bufferSubstringSearch(isDaemon, file, fileLength, byteOffset, initBufOffset, - stream, startByteOffset, totalBytesRead, buf, searchBytes, initialMatches, numMatches, beforeBytes); - - List<Map<String, Object>> matches = searchRet.getMatches(); - Integer newByteOffset = searchRet.getNewByteOffset(); - byte[] newBeforeBytes = searchRet.getNewBeforeBytes(); - - if (matches.size() < numMatches && totalBytesRead + startByteOffset < fileLength) { - // The start index is positioned to find any possible - // occurrence search string that did not quite fit in the - // buffer on the previous read. - int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length; - - totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength); - if (totalBytesRead < 0) { - throw new InvalidRequestException("Cannot search past the end of the file"); - } - - initialMatches = matches; - initBufOffset = newBufOffset; - byteOffset = newByteOffset; - beforeBytes = newBeforeBytes; - } else { - ret.put("isDaemon", isDaemon ? "yes" : "no"); - Integer nextByteOffset = null; - if (matches.size() >= numMatches || totalBytesRead < fileLength) { - nextByteOffset = (Integer) last(matches).get("byteOffset") + searchBytes.length; - if (fileLength <= nextByteOffset) { - nextByteOffset = null; - } - } - ret.putAll(mkGrepResponse(searchBytes, startByteOffset, matches, nextByteOffset)); - break; - } - } - - return ret; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - - /** - * Get the filtered, authorized, sorted log files for a port. - */ - private List<File> logsForPort(String user, File portDir) { - try { - List<File> workerLogs = DirectoryCleaner.getFilesForDir(portDir).stream() - .filter(file -> WORKER_LOG_FILENAME_PATTERN.asPredicate().test(file.getName())) - .collect(toList()); - - return workerLogs.stream() - .filter(log -> resourceAuthorizer.isUserAllowedToAccessFile(user, WorkerLogs.getTopologyPortWorkerLog(log))) - .sorted((f1, f2) -> (int) (f2.lastModified() - f1.lastModified())) - .collect(toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int offset, String search) { - logs = drop(logs, fileOffset); - - List<Map<String, Object>> matches = new ArrayList<>(); - int matchCount = 0; - - while (true) { - if (logs.isEmpty()) { - break; - } - - File firstLog = logs.get(0); - Map<String, Object> theseMatches; - try { - LOG.debug("Looking through {}", firstLog); - theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); - } catch (InvalidRequestException e) { - LOG.error("Can't search past end of file.", e); - theseMatches = new HashMap<>(); - } - - String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); - - List<Map<String, Object>> newMatches = new ArrayList<>(matches); - Map<String, Object> currentFileMatch = new HashMap<>(theseMatches); - currentFileMatch.put("fileName", fileName); - List<String> splitPath; - try { - splitPath = Arrays.asList(firstLog.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR)); - } catch (IOException e) { - throw new RuntimeException(e); - } - currentFileMatch.put("port", first(takeLast(splitPath, 2))); - newMatches.add(currentFileMatch); - - int newCount = matchCount + ((List<?>)theseMatches.get("matches")).size(); - - if (theseMatches.isEmpty()) { - // matches and matchCount is not changed - logs = rest(logs); - offset = 0; - fileOffset = fileOffset + 1; - } else if (newCount >= numMatches) { - matches = newMatches; - break; - } else { - matches = newMatches; - logs = rest(logs); - offset = 0; - fileOffset = fileOffset + 1; - matchCount = newCount; - } - } - - return new Matched(fileOffset, search, matches); - } - - - /** - * As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the - * substring and return a list of zero or more matches. - */ - private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, File file, int fileLength, int offsetToBuf, - int initBufOffset, BufferedInputStream stream, Integer bytesSkipped, - int bytesRead, ByteBuffer haystack, byte[] needle, - List<Map<String, Object>> initialMatches, Integer numMatches, byte[] beforeBytes) throws IOException { - int bufOffset = initBufOffset; - List<Map<String, Object>> matches = initialMatches; - - byte[] newBeforeBytes; - Integer newByteOffset; - - while (true) { - int offset = offsetOfBytes(haystack.array(), needle, bufOffset); - if (matches.size() < numMatches && offset >= 0) { - int fileOffset = offsetToBuf + offset; - int bytesNeededAfterMatch = haystack.limit() - GREP_CONTEXT_SIZE - needle.length; - - byte[] beforeArg = null; - byte[] afterArg = null; - if (offset < GREP_CONTEXT_SIZE) { - beforeArg = beforeBytes; - } - - if (offset > bytesNeededAfterMatch) { - afterArg = tryReadAhead(stream, haystack, offset, fileLength, bytesRead); - } - - bufOffset = offset + needle.length; - matches.add(mkMatchData(needle, haystack, offset, fileOffset, - file.getCanonicalPath(), isDaemon, beforeArg, afterArg)); - } else { - int beforeStrToOffset = Math.min(haystack.limit(), GREP_MAX_SEARCH_SIZE); - int beforeStrFromOffset = Math.max(0, beforeStrToOffset - GREP_CONTEXT_SIZE); - newBeforeBytes = Arrays.copyOfRange(haystack.array(), beforeStrFromOffset, beforeStrToOffset); - - // It's OK if new-byte-offset is negative. - // This is normal if we are out of bytes to read from a small file. - if (matches.size() >= numMatches) { - newByteOffset = ((Number) last(matches).get("byteOffset")).intValue() + needle.length; - } else { - newByteOffset = bytesSkipped + bytesRead - GREP_MAX_SEARCH_SIZE; - } - - break; - } - } - - return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes); - } - - private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file, - int fileLength) throws IOException { - byte[] bufArray = buf.array(); - - // Copy the 2nd half of the buffer to the first half. - System.arraycopy(bufArray, GREP_MAX_SEARCH_SIZE, bufArray, 0, GREP_MAX_SEARCH_SIZE); - - // Zero-out the 2nd half to prevent accidental matches. - Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0); - - // Fill the 2nd half with new bytes from the stream. - int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE)); - buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead); - return totalBytesRead + bytesRead; - } - - - private Map<String, Object> mkMatchData(byte[] needle, ByteBuffer haystack, int haystackOffset, int fileOffset, String fname, - boolean isDaemon, byte[] beforeBytes, byte[] afterBytes) - throws UnsupportedEncodingException, UnknownHostException { - String url; - if (isDaemon) { - url = urlToMatchCenteredInLogPageDaemonFile(needle, fname, fileOffset, logviewerPort); - } else { - url = urlToMatchCenteredInLogPage(needle, fname, fileOffset, logviewerPort); - } - - byte[] haystackBytes = haystack.array(); - String beforeString; - String afterString; - - if (haystackOffset >= GREP_CONTEXT_SIZE) { - StringBuilder sb = new StringBuilder(); - sb.append(new String(haystackBytes, (haystackOffset - GREP_CONTEXT_SIZE), GREP_CONTEXT_SIZE, "UTF-8")); - sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8")); - beforeString = sb.toString(); - } else { - int numDesired = Math.max(0, GREP_CONTEXT_SIZE - haystackOffset); - int beforeSize = beforeBytes != null ? beforeBytes.length : 0; - int numExpected = Math.min(beforeSize, numDesired); - - if (numExpected > 0) { - StringBuilder sb = new StringBuilder(); - sb.append(new String(beforeBytes, numExpected - beforeSize, numExpected, "UTF-8")); - sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8")); - beforeString = sb.toString(); - } else { - beforeString = new String(haystackBytes, 0, haystackOffset, "UTF-8"); - } - } - - int needleSize = needle.length; - int afterOffset = haystackOffset + needleSize; - int haystackSize = haystack.limit(); - - if ((afterOffset + GREP_CONTEXT_SIZE) < haystackSize) { - afterString = new String(haystackBytes, afterOffset, GREP_CONTEXT_SIZE, "UTF-8"); - } else { - int numDesired = GREP_CONTEXT_SIZE - (haystackSize - afterOffset); - int afterSize = afterBytes != null ? afterBytes.length : 0; - int numExpected = Math.min(afterSize, numDesired); - - if (numExpected > 0) { - StringBuilder sb = new StringBuilder(); - sb.append(new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8")); - sb.append(new String(afterBytes, 0, numExpected, "UTF-8")); - afterString = sb.toString(); - } else { - afterString = new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8"); - } - } - - Map<String, Object> ret = new HashMap<>(); - ret.put("byteOffset", fileOffset); - ret.put("beforeString", beforeString); - ret.put("afterString", afterString); - ret.put("matchString", new String(needle, "UTF-8")); - ret.put("logviewerURL", url); - - return ret; - } - - /** - * Tries once to read ahead in the stream to fill the context and - * resets the stream to its position before the call. - */ - private byte[] tryReadAhead(BufferedInputStream stream, ByteBuffer haystack, int offset, int fileLength, int bytesRead) throws IOException { - int numExpected = Math.min(fileLength - bytesRead, GREP_CONTEXT_SIZE); - byte[] afterBytes = new byte[numExpected]; - stream.mark(numExpected); - // Only try reading once. - stream.read(afterBytes, 0, numExpected); - stream.reset(); - return afterBytes; - } - - /** - * Searches a given byte array for a match of a sub-array of bytes. - * Returns the offset to the byte that matches, or -1 if no match was found. - */ - private int offsetOfBytes(byte[] buffer, byte[] search, int initOffset) { - if (search.length <= 0) { - throw new IllegalArgumentException("Search array should not be empty."); - } - - if (initOffset < 0) { - throw new IllegalArgumentException("Start offset shouldn't be negative."); - } - - int offset = initOffset; - int candidateOffset = initOffset; - int valOffset = 0; - int retOffset = 0; - - while (true) { - if (search.length - valOffset <= 0) { - // found - retOffset = candidateOffset; - break; - } else { - if (offset >= buffer.length) { - // We ran out of buffer for the search. - retOffset = -1; - break; - } else { - if (search[valOffset] != buffer[offset]) { - // The match at this candidate offset failed, so start over with the - // next candidate byte from the buffer. - int newOffset = candidateOffset + 1; - - offset = newOffset; - candidateOffset = newOffset; - valOffset = 0; - } else { - // So far it matches. Keep going... - offset = offset + 1; - valOffset = valOffset + 1; - } - } - } - } - - return retOffset; - } - - /** - * This response data only includes a next byte offset if there is more of the file to read. - */ - private Map<String, Object> mkGrepResponse(byte[] searchBytes, Integer offset, List<Map<String, Object>> matches, - Integer nextByteOffset) throws UnsupportedEncodingException { - Map<String, Object> ret = new HashMap<>(); - ret.put("searchString", new String(searchBytes, "UTF-8")); - ret.put("startByteOffset", offset); - ret.put("matches", matches); - if (nextByteOffset != null) { - ret.put("nextByteOffset", nextByteOffset); - } - return ret; - } - - private String urlToMatchCenteredInLogPage(byte[] needle, String fname, int offset, Integer port) throws UnknownHostException { - String host = Utils.hostname(); - String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR, - takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 3)); - - Map<String, Object> parameters = new HashMap<>(); - parameters.put("file", splittedFileName); - parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2))); - parameters.put("length", DEFAULT_BYTES_PER_PAGE); - - return URLBuilder.build(String.format("http://%s:%d/api/v1/log", host, port), parameters); - } - - private String urlToMatchCenteredInLogPageDaemonFile(byte[] needle, String fname, int offset, Integer port) throws UnknownHostException { - String host = Utils.hostname(); - String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR, - takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 1)); - - Map<String, Object> parameters = new HashMap<>(); - parameters.put("file", splittedFileName); - parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2))); - parameters.put("length", DEFAULT_BYTES_PER_PAGE); - - return URLBuilder.build(String.format("http://%s:%d/api/v1/daemonlog", host, port), parameters); - } - - private static class Matched implements JSONAware { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private int fileOffset; - private String searchString; - private List<Map<String, Object>> matches; - - public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches) { - this.fileOffset = fileOffset; - this.searchString = searchString; - this.matches = matches; - } - - public int getFileOffset() { - return fileOffset; - } - - public String getSearchString() { - return searchString; - } - - public List<Map<String, Object>> getMatches() { - return matches; - } - - @Override - public String toJSONString() { - try { - return OBJECT_MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - } - - private static class SubstringSearchResult { - private List<Map<String, Object>> matches; - private Integer newByteOffset; - private byte[] newBeforeBytes; - - public SubstringSearchResult(List<Map<String, Object>> matches, Integer newByteOffset, byte[] newBeforeBytes) { - this.matches = matches; - this.newByteOffset = newByteOffset; - this.newBeforeBytes = newBeforeBytes; - } - - public List<Map<String, Object>> getMatches() { - return matches; - } - - public Integer getNewByteOffset() { - return newByteOffset; - } - - public byte[] getNewBeforeBytes() { - return newBeforeBytes; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java deleted file mode 100644 index 1cc274c..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.handler; - -import j2html.tags.DomContent; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.daemon.DirectoryCleaner; -import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; -import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; -import org.apache.storm.utils.ServerUtils; - -import javax.ws.rs.core.Response; -import java.io.File; -import java.io.IOException; -import java.util.List; - -import static j2html.TagCreator.a; -import static j2html.TagCreator.body; -import static j2html.TagCreator.head; -import static j2html.TagCreator.html; -import static j2html.TagCreator.li; -import static j2html.TagCreator.link; -import static j2html.TagCreator.title; -import static j2html.TagCreator.ul; -import static java.util.stream.Collectors.toList; - -public class LogviewerProfileHandler { - - public static final String WORKER_LOG_FILENAME = "worker.log"; - private final String logRoot; - private final ResourceAuthorizer resourceAuthorizer; - - public LogviewerProfileHandler(String logRoot, ResourceAuthorizer resourceAuthorizer) { - this.logRoot = logRoot; - this.resourceAuthorizer = resourceAuthorizer; - } - - public Response listDumpFiles(String topologyId, String hostPort, String user) throws IOException { - String portStr = hostPort.split(":")[1]; - File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, logRoot, topologyId, portStr)); - - if (dir.exists()) { - String workerFileRelativePath = String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, WORKER_LOG_FILENAME); - if (resourceAuthorizer.isUserAllowedToAccessFile(user, workerFileRelativePath)) { - String content = buildDumpFileListPage(topologyId, hostPort, dir); - return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); - } else { - return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); - } - } else { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } - } - - private String buildDumpFileListPage(String topologyId, String hostPort, File dir) throws IOException { - List<DomContent> liTags = getProfilerDumpFiles(dir).stream() - .map(file -> li(a(file).withHref("/api/v1/dumps/" + topologyId + "/" + hostPort + "/" + file))) - .collect(toList()); - - return html( - head( - title("File Dumps - Storm Log Viewer"), - link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"), - link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"), - link().withRel("stylesheet").withHref("/css/style.css") - ), - body( - ul(liTags.toArray(new DomContent[]{})) - ) - ).render(); - } - - public Response downloadDumpFile(String topologyId, String hostPort, String fileName, String user) throws IOException { - String portStr = hostPort.split(":")[1]; - File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, logRoot, topologyId, portStr)); - File file = new File(dir, fileName); - - if (dir.exists() && file.exists()) { - String workerFileRelativePath = String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, WORKER_LOG_FILENAME); - if (resourceAuthorizer.isUserAllowedToAccessFile(user, workerFileRelativePath)) { - return LogviewerResponseBuilder.buildDownloadFile(file); - } else { - return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); - } - } else { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } - } - - private List<String> getProfilerDumpFiles(File dir) throws IOException { - List<File> filesForDir = DirectoryCleaner.getFilesForDir(dir); - return filesForDir.stream().filter(file -> { - String fileName = file.getName(); - return StringUtils.isNotEmpty(fileName) - && (fileName.endsWith(".txt") || fileName.endsWith(".jfr") || fileName.endsWith(".bin")); - }).map(File::getName).collect(toList()); - } - -}
