http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java new file mode 100644 index 0000000..c0a69cb --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java @@ -0,0 +1,686 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.handler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.DirectoryCleaner; +import org.apache.storm.daemon.common.JsonResponseBuilder; +import org.apache.storm.daemon.utils.StreamUtil; +import org.apache.storm.daemon.utils.URLBuilder; +import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; +import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.daemon.wip.logviewer.utils.WorkerLogs; +import org.apache.storm.ui.InvalidRequestException; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ServerUtils; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +import static java.util.stream.Collectors.toList; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.drop; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.first; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.last; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; +import static org.apache.storm.daemon.wip.logviewer.LogviewerConstant.DEFAULT_BYTES_PER_PAGE; + +public class LogviewerLogSearchHandler { + private final static Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); + + public static final int GREP_MAX_SEARCH_SIZE = 1024; + public static final int GREP_BUF_SIZE = 2048; + public static final int GREP_CONTEXT_SIZE = 128; + public static final Pattern WORKER_LOG_FILENAME_PATTERN = Pattern.compile("^worker.log(.*)"); + + private final Map<String, Object> stormConf; + private final String logRoot; + private final String daemonLogRoot; + private final ResourceAuthorizer resourceAuthorizer; + private final Integer logviewerPort; + + public LogviewerLogSearchHandler(Map<String, Object> stormConf, String logRoot, String daemonLogRoot, + ResourceAuthorizer resourceAuthorizer) { + this.stormConf = stormConf; + this.logRoot = logRoot; + this.daemonLogRoot = daemonLogRoot; + this.resourceAuthorizer = resourceAuthorizer; + + this.logviewerPort = ObjectReader.getInt(stormConf.get(DaemonConfig.LOGVIEWER_PORT)); + } + + public Response searchLogFile(String fileName, String user, boolean isDaemon, String search, + String numMatchesStr, String offsetStr, String callback, String origin) + throws IOException, InvalidRequestException { + String rootDir = isDaemon ? daemonLogRoot : logRoot; + File file = new File(rootDir, fileName).getCanonicalFile(); + Response response; + if (file.exists()) { + if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) { + Integer numMatchesInt = numMatchesStr != null ? tryParseIntParam("num-matches", numMatchesStr) : null; + Integer offsetInt = offsetStr != null ? tryParseIntParam("start-byte-offset", offsetStr) : null; + + try { + if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) { + Map<String, Object> entity = new HashMap<>(); + entity.put("isDaemon", isDaemon ? "yes" : "no"); + entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt)); + + response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin); + } else { + throw new InvalidRequestException("Search substring must be between 1 and 1024 " + + "UTF-8 bytes in size (inclusive)"); + } + } catch (Exception ex) { + response = LogviewerResponseBuilder.buildExceptionJsonResponse(ex, callback); + } + } else { + // unauthorized + response = LogviewerResponseBuilder.buildUnauthorizedUserJsonResponse(user, callback); + } + } else { + // not found + Map<String, String> entity = new HashMap<>(); + entity.put("error", "Not Found"); + entity.put("errorMessage", "The file was not found on this node."); + + response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build(); + } + + return response; + } + + public Response deepSearchLogsForTopology(String topologyId, String user, String search, + String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr, + Boolean searchArchived, String callback, String origin) { + String rootDir = logRoot; + Object returnValue; + File topologyDir = new File(rootDir + Utils.FILE_PATH_SEPARATOR + topologyId); + if (StringUtils.isEmpty(search) || !topologyDir.exists()) { + returnValue = new ArrayList<>(); + } else { + int fileOffset = ObjectReader.getInt(fileOffsetStr, 0); + int offset = ObjectReader.getInt(offsetStr, 0); + int numMatches = ObjectReader.getInt(numMatchesStr, 1); + + File[] portDirsArray = topologyDir.listFiles(); + List<File> portDirs; + if (portDirsArray != null) { + portDirs = Arrays.asList(portDirsArray); + } else { + portDirs = new ArrayList<>(); + } + + if (StringUtils.isEmpty(portStr) || portStr.equals("*")) { + // check for all ports + List<List<File>> filteredLogs = portDirs.stream() + .map(portDir -> logsForPort(user, portDir)) + .filter(logs -> logs != null && !logs.isEmpty()) + .collect(toList()); + + if (BooleanUtils.isTrue(searchArchived)) { + returnValue = filteredLogs.stream() + .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) + .collect(toList()); + } else { + returnValue = filteredLogs.stream() + .map(fl -> Collections.singletonList(first(fl))) + .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) + .collect(toList()); + } + } else { + int port = Integer.parseInt(portStr); + // check just the one port + List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS, + new ArrayList<>()); + boolean containsPort = slotsPorts.stream() + .anyMatch(slotPort -> slotPort != null && (slotPort == port)); + if (!containsPort) { + returnValue = new ArrayList<>(); + } else { + File portDir = new File(rootDir + Utils.FILE_PATH_SEPARATOR + topologyId + + Utils.FILE_PATH_SEPARATOR + port); + + if (!portDir.exists() || logsForPort(user, portDir).isEmpty()) { + returnValue = new ArrayList<>(); + } else { + List<File> filteredLogs = logsForPort(user, portDir); + if (BooleanUtils.isTrue(searchArchived)) { + returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); + } else { + returnValue = findNMatches(Collections.singletonList(first(filteredLogs)), + numMatches, 0, offset, search); + } + } + } + } + } + + return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin); + } + + private Integer tryParseIntParam(String paramName, String value) throws InvalidRequestException { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new InvalidRequestException("Could not parse " + paramName + " to an integer"); + } + } + + private Map<String,Object> substringSearch(File file, String searchString, int numMatches, int startByteOffset) throws InvalidRequestException { + return substringSearch(file, searchString, false, numMatches, startByteOffset); + } + + private Map<String,Object> substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches, Integer startByteOffset) throws InvalidRequestException { + try { + if (StringUtils.isEmpty(searchString)) { + throw new IllegalArgumentException("Precondition fails: search string should not be empty."); + } + if (searchString.getBytes("UTF-8").length > GREP_MAX_SEARCH_SIZE) { + throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " + GREP_MAX_SEARCH_SIZE); + } + + boolean isZipFile = file.getName().endsWith(".gz"); + FileInputStream fis = new FileInputStream(file); + InputStream gzippedInputStream; + if (isZipFile) { + gzippedInputStream = new GZIPInputStream(fis); + } else { + gzippedInputStream = fis; + } + + BufferedInputStream stream = new BufferedInputStream(gzippedInputStream); + + int fileLength; + if (isZipFile) { + fileLength = (int) ServerUtils.zipFileSize(file); + } else { + fileLength = (int) file.length(); + } + + ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE); + byte[] bufArray = buf.array(); + int totalBytesRead = 0; + byte[] searchBytes = searchString.getBytes("UTF-8"); + numMatches = numMatches != null ? numMatches : 10; + startByteOffset = startByteOffset != null ? startByteOffset : 0; + + // Start at the part of the log file we are interested in. + // Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files + if (startByteOffset > fileLength) { + throw new InvalidRequestException("Cannot search past the end of the file"); + } + + if (startByteOffset > 0) { + StreamUtil.skipBytes(stream, startByteOffset); + } + + Arrays.fill(bufArray, (byte) 0); + + int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE)); + buf.limit(bytesRead); + totalBytesRead += bytesRead; + + List<Map<String, Object>> initialMatches = new ArrayList<>(); + int initBufOffset = 0; + int byteOffset = startByteOffset; + byte[] beforeBytes = null; + + Map<String, Object> ret = new HashMap<>(); + while (true) { + SubstringSearchResult searchRet = bufferSubstringSearch(isDaemon, file, fileLength, byteOffset, initBufOffset, + stream, startByteOffset, totalBytesRead, buf, searchBytes, initialMatches, numMatches, beforeBytes); + + List<Map<String, Object>> matches = searchRet.getMatches(); + Integer newByteOffset = searchRet.getNewByteOffset(); + byte[] newBeforeBytes = searchRet.getNewBeforeBytes(); + + if (matches.size() < numMatches && totalBytesRead + startByteOffset < fileLength) { + // The start index is positioned to find any possible + // occurrence search string that did not quite fit in the + // buffer on the previous read. + int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length; + + totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength); + if (totalBytesRead < 0) { + throw new InvalidRequestException("Cannot search past the end of the file"); + } + + initialMatches = matches; + initBufOffset = newBufOffset; + byteOffset = newByteOffset; + beforeBytes = newBeforeBytes; + } else { + ret.put("isDaemon", isDaemon ? "yes" : "no"); + Integer nextByteOffset = null; + if (matches.size() >= numMatches || totalBytesRead < fileLength) { + nextByteOffset = (Integer) last(matches).get("byteOffset") + searchBytes.length; + if (fileLength <= nextByteOffset) { + nextByteOffset = null; + } + } + ret.putAll(mkGrepResponse(searchBytes, startByteOffset, matches, nextByteOffset)); + break; + } + } + + return ret; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + /** + * Get the filtered, authorized, sorted log files for a port. + */ + private List<File> logsForPort(String user, File portDir) { + try { + List<File> workerLogs = DirectoryCleaner.getFilesForDir(portDir).stream() + .filter(file -> WORKER_LOG_FILENAME_PATTERN.asPredicate().test(file.getName())) + .collect(toList()); + + return workerLogs.stream() + .filter(log -> resourceAuthorizer.isUserAllowedToAccessFile(user, WorkerLogs.getTopologyPortWorkerLog(log))) + .sorted((f1, f2) -> (int) (f2.lastModified() - f1.lastModified())) + .collect(toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int offset, String search) { + logs = drop(logs, fileOffset); + + List<Map<String, Object>> matches = new ArrayList<>(); + int matchCount = 0; + + while (true) { + if (logs.isEmpty()) { + break; + } + + File firstLog = logs.get(0); + Map<String, Object> theseMatches; + try { + LOG.debug("Looking through {}", firstLog); + theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); + } catch (InvalidRequestException e) { + LOG.error("Can't search past end of file.", e); + theseMatches = new HashMap<>(); + } + + String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); + + List<Map<String, Object>> newMatches = new ArrayList<>(matches); + Map<String, Object> currentFileMatch = new HashMap<>(theseMatches); + currentFileMatch.put("fileName", fileName); + List<String> splitPath; + try { + splitPath = Arrays.asList(firstLog.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR)); + } catch (IOException e) { + throw new RuntimeException(e); + } + currentFileMatch.put("port", first(takeLast(splitPath, 2))); + newMatches.add(currentFileMatch); + + int newCount = matchCount + ((List<?>)theseMatches.get("matches")).size(); + + if (theseMatches.isEmpty()) { + // matches and matchCount is not changed + logs = rest(logs); + offset = 0; + fileOffset = fileOffset + 1; + } else if (newCount >= numMatches) { + matches = newMatches; + break; + } else { + matches = newMatches; + logs = rest(logs); + offset = 0; + fileOffset = fileOffset + 1; + matchCount = newCount; + } + } + + return new Matched(fileOffset, search, matches); + } + + + /** + * As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the + * substring and return a list of zero or more matches. + */ + private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, File file, int fileLength, int offsetToBuf, + int initBufOffset, BufferedInputStream stream, Integer bytesSkipped, + int bytesRead, ByteBuffer haystack, byte[] needle, + List<Map<String, Object>> initialMatches, Integer numMatches, byte[] beforeBytes) throws IOException { + int bufOffset = initBufOffset; + List<Map<String, Object>> matches = initialMatches; + + byte[] newBeforeBytes; + Integer newByteOffset; + + while (true) { + int offset = offsetOfBytes(haystack.array(), needle, bufOffset); + if (matches.size() < numMatches && offset >= 0) { + int fileOffset = offsetToBuf + offset; + int bytesNeededAfterMatch = haystack.limit() - GREP_CONTEXT_SIZE - needle.length; + + byte[] beforeArg = null; + byte[] afterArg = null; + if (offset < GREP_CONTEXT_SIZE) { + beforeArg = beforeBytes; + } + + if (offset > bytesNeededAfterMatch) { + afterArg = tryReadAhead(stream, haystack, offset, fileLength, bytesRead); + } + + bufOffset = offset + needle.length; + matches.add(mkMatchData(needle, haystack, offset, fileOffset, + file.getCanonicalPath(), isDaemon, beforeArg, afterArg)); + } else { + int beforeStrToOffset = Math.min(haystack.limit(), GREP_MAX_SEARCH_SIZE); + int beforeStrFromOffset = Math.max(0, beforeStrToOffset - GREP_CONTEXT_SIZE); + newBeforeBytes = Arrays.copyOfRange(haystack.array(), beforeStrFromOffset, beforeStrToOffset); + + // It's OK if new-byte-offset is negative. + // This is normal if we are out of bytes to read from a small file. + if (matches.size() >= numMatches) { + newByteOffset = ((Number) last(matches).get("byteOffset")).intValue() + needle.length; + } else { + newByteOffset = bytesSkipped + bytesRead - GREP_MAX_SEARCH_SIZE; + } + + break; + } + } + + return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes); + } + + private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file, + int fileLength) throws IOException { + byte[] bufArray = buf.array(); + + // Copy the 2nd half of the buffer to the first half. + System.arraycopy(bufArray, GREP_MAX_SEARCH_SIZE, bufArray, 0, GREP_MAX_SEARCH_SIZE); + + // Zero-out the 2nd half to prevent accidental matches. + Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0); + + // Fill the 2nd half with new bytes from the stream. + int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE)); + buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead); + return totalBytesRead + bytesRead; + } + + + private Map<String, Object> mkMatchData(byte[] needle, ByteBuffer haystack, int haystackOffset, int fileOffset, String fname, + boolean isDaemon, byte[] beforeBytes, byte[] afterBytes) + throws UnsupportedEncodingException, UnknownHostException { + String url; + if (isDaemon) { + url = urlToMatchCenteredInLogPageDaemonFile(needle, fname, fileOffset, logviewerPort); + } else { + url = urlToMatchCenteredInLogPage(needle, fname, fileOffset, logviewerPort); + } + + byte[] haystackBytes = haystack.array(); + String beforeString; + String afterString; + + if (haystackOffset >= GREP_CONTEXT_SIZE) { + StringBuilder sb = new StringBuilder(); + sb.append(new String(haystackBytes, (haystackOffset - GREP_CONTEXT_SIZE), GREP_CONTEXT_SIZE, "UTF-8")); + sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8")); + beforeString = sb.toString(); + } else { + int numDesired = Math.max(0, GREP_CONTEXT_SIZE - haystackOffset); + int beforeSize = beforeBytes != null ? beforeBytes.length : 0; + int numExpected = Math.min(beforeSize, numDesired); + + if (numExpected > 0) { + StringBuilder sb = new StringBuilder(); + sb.append(new String(beforeBytes, numExpected - beforeSize, numExpected, "UTF-8")); + sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8")); + beforeString = sb.toString(); + } else { + beforeString = new String(haystackBytes, 0, haystackOffset, "UTF-8"); + } + } + + int needleSize = needle.length; + int afterOffset = haystackOffset + needleSize; + int haystackSize = haystack.limit(); + + if ((afterOffset + GREP_CONTEXT_SIZE) < haystackSize) { + afterString = new String(haystackBytes, afterOffset, GREP_CONTEXT_SIZE, "UTF-8"); + } else { + int numDesired = GREP_CONTEXT_SIZE - (haystackSize - afterOffset); + int afterSize = afterBytes != null ? afterBytes.length : 0; + int numExpected = Math.min(afterSize, numDesired); + + if (numExpected > 0) { + StringBuilder sb = new StringBuilder(); + sb.append(new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8")); + sb.append(new String(afterBytes, 0, numExpected, "UTF-8")); + afterString = sb.toString(); + } else { + afterString = new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8"); + } + } + + Map<String, Object> ret = new HashMap<>(); + ret.put("byteOffset", fileOffset); + ret.put("beforeString", beforeString); + ret.put("afterString", afterString); + ret.put("matchString", new String(needle, "UTF-8")); + ret.put("logviewerURL", url); + + return ret; + } + + /** + * Tries once to read ahead in the stream to fill the context and + * resets the stream to its position before the call. + */ + private byte[] tryReadAhead(BufferedInputStream stream, ByteBuffer haystack, int offset, int fileLength, int bytesRead) throws IOException { + int numExpected = Math.min(fileLength - bytesRead, GREP_CONTEXT_SIZE); + byte[] afterBytes = new byte[numExpected]; + stream.mark(numExpected); + // Only try reading once. + stream.read(afterBytes, 0, numExpected); + stream.reset(); + return afterBytes; + } + + /** + * Searches a given byte array for a match of a sub-array of bytes. + * Returns the offset to the byte that matches, or -1 if no match was found. + */ + private int offsetOfBytes(byte[] buffer, byte[] search, int initOffset) { + if (search.length <= 0) { + throw new IllegalArgumentException("Search array should not be empty."); + } + + if (initOffset < 0) { + throw new IllegalArgumentException("Start offset shouldn't be negative."); + } + + int offset = initOffset; + int candidateOffset = initOffset; + int valOffset = 0; + int retOffset = 0; + + while (true) { + if (search.length - valOffset <= 0) { + // found + retOffset = candidateOffset; + break; + } else { + if (offset >= buffer.length) { + // We ran out of buffer for the search. + retOffset = -1; + break; + } else { + if (search[valOffset] != buffer[offset]) { + // The match at this candidate offset failed, so start over with the + // next candidate byte from the buffer. + int newOffset = candidateOffset + 1; + + offset = newOffset; + candidateOffset = newOffset; + valOffset = 0; + } else { + // So far it matches. Keep going... + offset = offset + 1; + valOffset = valOffset + 1; + } + } + } + } + + return retOffset; + } + + /** + * This response data only includes a next byte offset if there is more of the file to read. + */ + private Map<String, Object> mkGrepResponse(byte[] searchBytes, Integer offset, List<Map<String, Object>> matches, + Integer nextByteOffset) throws UnsupportedEncodingException { + Map<String, Object> ret = new HashMap<>(); + ret.put("searchString", new String(searchBytes, "UTF-8")); + ret.put("startByteOffset", offset); + ret.put("matches", matches); + if (nextByteOffset != null) { + ret.put("nextByteOffset", nextByteOffset); + } + return ret; + } + + private String urlToMatchCenteredInLogPage(byte[] needle, String fname, int offset, Integer port) throws UnknownHostException { + String host = Utils.hostname(); + String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR, + takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 3)); + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("file", splittedFileName); + parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2))); + parameters.put("length", DEFAULT_BYTES_PER_PAGE); + + return URLBuilder.build(String.format("http://%s:%d/api/v1/log", host, port), parameters); + } + + private String urlToMatchCenteredInLogPageDaemonFile(byte[] needle, String fname, int offset, Integer port) throws UnknownHostException { + String host = Utils.hostname(); + String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR, + takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 1)); + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("file", splittedFileName); + parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2))); + parameters.put("length", DEFAULT_BYTES_PER_PAGE); + + return URLBuilder.build(String.format("http://%s:%d/api/v1/daemonlog", host, port), parameters); + } + + private static class Matched implements JSONAware { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private int fileOffset; + private String searchString; + private List<Map<String, Object>> matches; + + public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches) { + this.fileOffset = fileOffset; + this.searchString = searchString; + this.matches = matches; + } + + public int getFileOffset() { + return fileOffset; + } + + public String getSearchString() { + return searchString; + } + + public List<Map<String, Object>> getMatches() { + return matches; + } + + @Override + public String toJSONString() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + } + + private static class SubstringSearchResult { + private List<Map<String, Object>> matches; + private Integer newByteOffset; + private byte[] newBeforeBytes; + + public SubstringSearchResult(List<Map<String, Object>> matches, Integer newByteOffset, byte[] newBeforeBytes) { + this.matches = matches; + this.newByteOffset = newByteOffset; + this.newBeforeBytes = newBeforeBytes; + } + + public List<Map<String, Object>> getMatches() { + return matches; + } + + public Integer getNewByteOffset() { + return newByteOffset; + } + + public byte[] getNewBeforeBytes() { + return newBeforeBytes; + } + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java new file mode 100644 index 0000000..1cc274c --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.handler; + +import j2html.tags.DomContent; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.daemon.DirectoryCleaner; +import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder; +import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.utils.ServerUtils; + +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.util.List; + +import static j2html.TagCreator.a; +import static j2html.TagCreator.body; +import static j2html.TagCreator.head; +import static j2html.TagCreator.html; +import static j2html.TagCreator.li; +import static j2html.TagCreator.link; +import static j2html.TagCreator.title; +import static j2html.TagCreator.ul; +import static java.util.stream.Collectors.toList; + +public class LogviewerProfileHandler { + + public static final String WORKER_LOG_FILENAME = "worker.log"; + private final String logRoot; + private final ResourceAuthorizer resourceAuthorizer; + + public LogviewerProfileHandler(String logRoot, ResourceAuthorizer resourceAuthorizer) { + this.logRoot = logRoot; + this.resourceAuthorizer = resourceAuthorizer; + } + + public Response listDumpFiles(String topologyId, String hostPort, String user) throws IOException { + String portStr = hostPort.split(":")[1]; + File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, logRoot, topologyId, portStr)); + + if (dir.exists()) { + String workerFileRelativePath = String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, WORKER_LOG_FILENAME); + if (resourceAuthorizer.isUserAllowedToAccessFile(user, workerFileRelativePath)) { + String content = buildDumpFileListPage(topologyId, hostPort, dir); + return LogviewerResponseBuilder.buildSuccessHtmlResponse(content); + } else { + return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); + } + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } + + private String buildDumpFileListPage(String topologyId, String hostPort, File dir) throws IOException { + List<DomContent> liTags = getProfilerDumpFiles(dir).stream() + .map(file -> li(a(file).withHref("/api/v1/dumps/" + topologyId + "/" + hostPort + "/" + file))) + .collect(toList()); + + return html( + head( + title("File Dumps - Storm Log Viewer"), + link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"), + link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"), + link().withRel("stylesheet").withHref("/css/style.css") + ), + body( + ul(liTags.toArray(new DomContent[]{})) + ) + ).render(); + } + + public Response downloadDumpFile(String topologyId, String hostPort, String fileName, String user) throws IOException { + String portStr = hostPort.split(":")[1]; + File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, logRoot, topologyId, portStr)); + File file = new File(dir, fileName); + + if (dir.exists() && file.exists()) { + String workerFileRelativePath = String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, WORKER_LOG_FILENAME); + if (resourceAuthorizer.isUserAllowedToAccessFile(user, workerFileRelativePath)) { + return LogviewerResponseBuilder.buildDownloadFile(file); + } else { + return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); + } + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } + + private List<String> getProfilerDumpFiles(File dir) throws IOException { + List<File> filesForDir = DirectoryCleaner.getFilesForDir(dir); + return filesForDir.stream().filter(file -> { + String fileName = file.getName(); + return StringUtils.isNotEmpty(fileName) + && (fileName.endsWith(".txt") || fileName.endsWith(".jfr") || fileName.endsWith(".bin")); + }).map(File::getName).collect(toList()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java new file mode 100644 index 0000000..6aa1050 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.utils; + +import org.apache.commons.io.IOUtils; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.DirectoryCleaner; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.jooq.lambda.Unchecked; +import org.jooq.lambda.tuple.Tuple2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.function.BinaryOperator; +import java.util.stream.StreamSupport; + +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toMap; +import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS; +import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS; +import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB; +import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB; + +public class LogCleaner implements Runnable, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + public static final String WORKER_YAML = "worker.yaml"; + + private final Map<String, Object> stormConf; + private final Integer intervalSecs; + private final String logRootDir; + private StormTimer logviewerCleanupTimer; + private final long maxSumWorkerLogsSizeMb; + private long maxPerWorkerLogsSizeMb; + + public LogCleaner(Map<String, Object> stormConf) { + String logRootDir = ConfigUtils.workerArtifactsRoot(stormConf); + + this.stormConf = stormConf; + this.intervalSecs = ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_INTERVAL_SECS), null); + this.logRootDir = logRootDir; + + maxSumWorkerLogsSizeMb = ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB)); + maxPerWorkerLogsSizeMb = ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB)); + maxPerWorkerLogsSizeMb = Math.min(maxPerWorkerLogsSizeMb, (long) (maxSumWorkerLogsSizeMb * 0.5)); + + LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB", + maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb); + } + + public void start() { + if (intervalSecs != null) { + LOG.debug("starting log cleanup thread at interval: {}", intervalSecs); + + logviewerCleanupTimer = new StormTimer("logviewer-cleanup", (t, e) -> { + LOG.error("Error when doing logs cleanup", e); + Utils.exitProcess(20, "Error when doing log cleanup"); + }); + + logviewerCleanupTimer.scheduleRecurring(0, intervalSecs, this); + } else { + LOG.warn("The interval for log cleanup is not set. Skip starting log cleanup thread."); + } + + } + + public void close() { + if (logviewerCleanupTimer != null) { + try { + logviewerCleanupTimer.close(); + } catch (Exception ex) { + throw Utils.wrapInRuntime(ex); + } + } + } + + /** + * Delete old log dirs for which the workers are no longer alive. + */ + public void run() { + try { + int nowSecs = Time.currentTimeSecs(); + Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000, logRootDir); + + DirectoryCleaner cleaner = new DirectoryCleaner(); + SortedSet<File> deadWorkerDirs = getDeadWorkerDirs(nowSecs, oldLogDirs); + + LOG.debug("log cleanup: now={} old log dirs {} dead worker dirs {}", nowSecs, + oldLogDirs.stream().map(File::getName).collect(joining(",")), + deadWorkerDirs.stream().map(File::getName).collect(joining(","))); + + deadWorkerDirs.forEach(Unchecked.consumer(dir -> { + String path = dir.getCanonicalPath(); + LOG.info("Cleaning up: Removing {}", path); + + try { + Utils.forceDelete(path); + cleanupEmptyTopoDirectory(dir); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + })); + + perWorkerDirCleanup(new File(logRootDir), maxPerWorkerLogsSizeMb * 1024 * 1024, cleaner); + globalLogCleanup(new File(logRootDir), maxSumWorkerLogsSizeMb * 1024 * 1024, cleaner); + } catch (Exception ex) { + LOG.error("Exception while cleaning up old log.", ex); + } + } + + /** + * Delete the oldest files in each overloaded worker log dir. + */ + private void perWorkerDirCleanup(File rootDir, long size, DirectoryCleaner cleaner) { + WorkerLogs.getAllWorkerDirs(rootDir).forEach(Unchecked.consumer(dir -> { + cleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null); + })); + } + + /** + * Delete the oldest files in overloaded worker-artifacts globally. + */ + private void globalLogCleanup(File rootDir, long size, DirectoryCleaner cleaner) throws Exception { + List<File> workerDirs = new ArrayList<>(WorkerLogs.getAllWorkerDirs(rootDir)); + Set<String> aliveWorkerDirs = new HashSet<>(getAliveWorkerDirs(rootDir)); + + cleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); + } + + /** + * Return a sorted set of java.io.Files that were written by workers that are now active. + */ + private SortedSet<String> getAliveWorkerDirs(File rootDir) throws Exception { + Set<String> aliveIds = getAliveIds(Time.currentTimeSecs()); + Set<File> logDirs = WorkerLogs.getAllWorkerDirs(rootDir); + Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); + + return idToDir.entrySet().stream() + .filter(entry -> aliveIds.contains(entry.getKey())) + .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) + .collect(toCollection(TreeSet::new)); + } + + private Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { + return logDirs.stream().map(Unchecked.function(logDir -> { + Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); + + return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) + .orElse(new Tuple2<>("", logDir)); + })).collect(toMap(Tuple2::v1, Tuple2::v2)); + } + + private Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { + File metaFile = new File(logDir, WORKER_YAML); + if (metaFile.exists()) { + return Optional.of(metaFile); + } else { + LOG.warn("Could not find {} to clean up for {}", metaFile.getCanonicalPath(), logDir); + return Optional.empty(); + } + } + + private String getWorkerIdFromMetadataFile(String metaFile) { + Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); + return ObjectReader.getString(map.get("worker-id"), null); + } + + private Set<String> getAliveIds(int nowSecs) throws Exception { + return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() + .filter(entry -> Objects.nonNull(entry.getValue()) + && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf)) + .map(Map.Entry::getKey) + .collect(toCollection(TreeSet::new)); + } + + /** + * Delete the topo dir if it contains zero port dirs. + */ + private void cleanupEmptyTopoDirectory(File dir) throws IOException { + File topoDir = dir.getParentFile(); + if (topoDir.listFiles().length == 0) { + Utils.forceDelete(topoDir.getCanonicalPath()); + } + } + + /** + * Return a sorted set of java.io.Files that were written by workers that are now dead. + */ + private SortedSet<File> getDeadWorkerDirs(int nowSecs, Set<File> logDirs) throws Exception { + if (logDirs.isEmpty()) { + return new TreeSet<>(); + } else { + Set<String> aliveIds = getAliveIds(nowSecs); + Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); + + return idToDir.entrySet().stream() + .filter(entry -> !aliveIds.contains(entry.getKey())) + .map(Map.Entry::getValue) + .collect(toCollection(TreeSet::new)); + } + } + + private Set<File> selectDirsForCleanup(int nowMillis, String rootDir) { + FileFilter fileFilter = mkFileFilterForLogCleanup(nowMillis); + + return Arrays.stream(new File(rootDir).listFiles()) + .flatMap(topoDir -> Arrays.stream(topoDir.listFiles(fileFilter))) + .collect(toCollection(TreeSet::new)); + } + + private FileFilter mkFileFilterForLogCleanup(int nowMillis) { + final int cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis); + return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis; + } + + /** + * Return the last modified time for all log files in a worker's log dir. + * Using stream rather than File.listFiles is to avoid large mem usage + * when a directory has too many files. + */ + private long lastModifiedTimeWorkerLogdir(File logDir) { + Optional<DirectoryStream<Path>> dirStreamOptional = getStreamForDir(logDir); + long dirModified = logDir.lastModified(); + + if (!dirStreamOptional.isPresent()) { + return dirModified; + } + + DirectoryStream<Path> dirStream = dirStreamOptional.get(); + try { + return StreamSupport.stream(dirStream.spliterator(), false) + .reduce(dirModified, (maximum, path) -> { + long curr = path.toFile().lastModified(); + return curr > maximum ? curr : maximum; + }, BinaryOperator.maxBy(Long::compareTo)); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + return dirModified; + } finally { + if (DirectoryStream.class.isInstance(dirStream)) { + IOUtils.closeQuietly(dirStream); + } + } + } + + private Optional<DirectoryStream<Path>> getStreamForDir(File file) { + try { + return Optional.of(Files.newDirectoryStream(file.toPath())); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + return Optional.empty(); + } + } + + private int cleanupCutoffAgeMillis(int nowMillis) { + return nowMillis - (ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_AGE_MINS))); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java new file mode 100644 index 0000000..dcb2e2c --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.utils; + +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; + +public class LogFileDownloader { + + private final String logRoot; + private final String daemonLogRoot; + private final ResourceAuthorizer resourceAuthorizer; + + public LogFileDownloader(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { + this.logRoot = logRoot; + this.daemonLogRoot = daemonLogRoot; + this.resourceAuthorizer = resourceAuthorizer; + } + + public Response downloadFile(String fileName, String user, boolean isDaemon) throws IOException { + String rootDir = isDaemon ? daemonLogRoot : logRoot; + File file = new File(rootDir, fileName).getCanonicalFile(); + if (file.exists()) { + if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { + return LogviewerResponseBuilder.buildDownloadFile(file); + } else { + return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); + } + } else { + return LogviewerResponseBuilder.buildResponsePageNotFound(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java new file mode 100644 index 0000000..bc60e09 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.utils; + +import com.google.common.io.ByteStreams; +import org.apache.storm.daemon.common.JsonResponseBuilder; +import org.apache.storm.ui.UIHelpers; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static j2html.TagCreator.body; +import static j2html.TagCreator.h2; +import static javax.ws.rs.core.Response.Status.OK; +import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; + +public class LogviewerResponseBuilder { + + private LogviewerResponseBuilder() { + } + + public static Response buildSuccessHtmlResponse(String content) { + return Response.status(OK).entity(content) + .type(MediaType.TEXT_HTML_TYPE).build(); + } + + public static Response buildSuccessJsonResponse(Object entity, String callback, String origin) { + return new JsonResponseBuilder().setData(entity).setCallback(callback) + .setHeaders(LogviewerResponseBuilder.getHeadersForSuccessResponse(origin)).build(); + } + + public static Response buildDownloadFile(File file) throws IOException { + // do not close this InputStream in method: it will be used from jetty server + InputStream is = new FileInputStream(file); + return Response.status(OK) + .entity(wrapWithStreamingOutput(is)) + .type(MediaType.APPLICATION_OCTET_STREAM_TYPE) + .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"") + .build(); + } + + public static Response buildResponseUnautohrizedUser(String user) { + String entity = buildUnauthorizedUserHtml(user); + return Response.status(OK) + .entity(entity) + .type(MediaType.TEXT_HTML_TYPE) + .build(); + } + + public static Response buildResponsePageNotFound() { + return Response.status(404) + .entity("Page not found") + .type(MediaType.TEXT_HTML_TYPE) + .build(); + } + + public static Response buildUnauthorizedUserJsonResponse(String user, String callback) { + return new JsonResponseBuilder().setData(UIHelpers.unauthorizedUserJson(user)) + .setCallback(callback).setStatus(401).build(); + } + + public static Response buildExceptionJsonResponse(Exception ex, String callback) { + return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(ex)) + .setCallback(callback).setStatus(500).build(); + } + + private static Map<String, Object> getHeadersForSuccessResponse(String origin) { + Map<String, Object> headers = new HashMap<>(); + headers.put("Access-Control-Allow-Origin", origin); + headers.put("Access-Control-Allow-Credentials", "true"); + return headers; + } + + private static String buildUnauthorizedUserHtml(String user) { + String content = "User '" + escapeHtml(user) + "' is not authorized."; + return body(h2(content)).render(); + } + + private static StreamingOutput wrapWithStreamingOutput(final InputStream inputStream) { + return os -> { + OutputStream wrappedOutputStream = os; + if (!(os instanceof BufferedOutputStream)) { + wrappedOutputStream = new BufferedOutputStream(os); + } + + ByteStreams.copy(inputStream, wrappedOutputStream); + + wrappedOutputStream.flush(); + }; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java new file mode 100644 index 0000000..c7a45e7 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.utils; + +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IGroupMappingServiceProvider; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ServerConfigUtils; +import org.apache.storm.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.storm.DaemonConfig.UI_FILTER; + +public class ResourceAuthorizer { + + private final Map<String, Object> stormConf; + private final IGroupMappingServiceProvider groupMappingServiceProvider; + + public ResourceAuthorizer(Map<String, Object> stormConf) { + this.stormConf = stormConf; + this.groupMappingServiceProvider = AuthUtils.GetGroupMappingServiceProviderPlugin(stormConf); + } + + public boolean isUserAllowedToAccessFile(String fileName, String user) { + return isUiFilterNotSet() || isAuthorizedLogUser(user, fileName); + } + + private boolean isUiFilterNotSet() { + return StringUtils.isBlank(ObjectReader.getString(stormConf.get(UI_FILTER), null)); + } + + private boolean isAuthorizedLogUser(String user, String fileName) { + if (StringUtils.isEmpty(user) || StringUtils.isEmpty(fileName) + || getLogUserGroupWhitelist(fileName) == null) { + return false; + } else { + Set<String> groups = getUserGroups(user); + LogUserGroupWhitelist whitelist = getLogUserGroupWhitelist(fileName); + + List<String> logsUsers = new ArrayList<>(); + logsUsers.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_USERS))); + logsUsers.addAll(ObjectReader.getStrings(stormConf.get(Config.NIMBUS_ADMINS))); + logsUsers.addAll(whitelist.getUserWhitelist()); + + List<String> logsGroups = new ArrayList<>(); + logsGroups.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_GROUPS))); + logsGroups.addAll(whitelist.getGroupWhitelist()); + + return logsUsers.stream().anyMatch(u -> u.equals(user)) || + Sets.intersection(groups, new HashSet<>(logsGroups)).size() < 0; + + } + } + + public LogUserGroupWhitelist getLogUserGroupWhitelist(String fileName) { + File wlFile = ServerConfigUtils.getLogMetaDataFile(fileName); + Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(wlFile.getAbsolutePath()); + + if (map == null) { + return null; + } + + List<String> logsUsers = ObjectReader.getStrings(map.get(DaemonConfig.LOGS_USERS)); + List<String> logsGroups = ObjectReader.getStrings(map.get(DaemonConfig.LOGS_GROUPS)); + return new LogUserGroupWhitelist( + logsUsers.isEmpty() ? new HashSet<>() : new HashSet<>(logsUsers), + logsGroups.isEmpty() ? new HashSet<>() : new HashSet<>(logsGroups) + ); + } + + private Set<String> getUserGroups(String user) { + try { + if (StringUtils.isEmpty(user)) { + return new HashSet<>(); + } else { + return groupMappingServiceProvider.getGroups(user); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static class LogUserGroupWhitelist { + + private Set<String> userWhitelist; + private Set<String> groupWhitelist; + + public LogUserGroupWhitelist(Set<String> userWhitelist, Set<String> groupWhitelist) { + this.userWhitelist = userWhitelist; + this.groupWhitelist = groupWhitelist; + } + + public Set<String> getUserWhitelist() { + return userWhitelist; + } + + public Set<String> getGroupWhitelist() { + return groupWhitelist; + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java new file mode 100644 index 0000000..c49a4a1 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java @@ -0,0 +1,63 @@ +package org.apache.storm.daemon.wip.logviewer.utils; + +import org.apache.storm.daemon.DirectoryCleaner; +import org.apache.storm.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toCollection; +import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; + +public class WorkerLogs { + + private WorkerLogs() { + } + + /** + * Return the path of the worker log with the format of topoId/port/worker.log.* + */ + public static String getTopologyPortWorkerLog(File file) { + try { + String[] splitted = file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR); + List<String> split = takeLast(Arrays.asList(splitted), 3); + + return String.join(Utils.FILE_PATH_SEPARATOR, split); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static List<File> getAllLogsForRootDir(File logDir) throws IOException { + List<File> files = new ArrayList<>(); + Set<File> topoDirFiles = getAllWorkerDirs(logDir); + if (topoDirFiles != null) { + for (File portDir : topoDirFiles) { + files.addAll(DirectoryCleaner.getFilesForDir(portDir)); + } + } + + return files; + } + + public static Set<File> getAllWorkerDirs(File rootDir) { + File[] rootDirFiles = rootDir.listFiles(); + if (rootDirFiles != null) { + return Arrays.stream(rootDirFiles).flatMap(topoDir -> { + File[] topoFiles = topoDir.listFiles(); + return topoFiles != null ? Arrays.stream(topoFiles) : Stream.empty(); + }).collect(toCollection(TreeSet::new)); + } + + return new TreeSet<>(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java new file mode 100644 index 0000000..c1b4bc7 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.webapp; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.RollingFileAppender; +import org.apache.storm.daemon.common.AuthorizationExceptionMapper; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler; +import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IHttpCredentialsPlugin; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ObjectReader; + +import javax.ws.rs.ApplicationPath; +import javax.ws.rs.core.Application; +import java.io.File; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.storm.DaemonConfig.LOGVIEWER_APPENDER_NAME; + +@ApplicationPath("") +public class LogviewerApplication extends Application { + private static Map<String, Object> stormConf; + private final Set<Object> singletons = new HashSet<Object>(); + + /** + * Constructor. + */ + public LogviewerApplication() { + String logRoot = ConfigUtils.workerArtifactsRoot(stormConf); + String daemonLogRoot = logRootDir(ObjectReader.getString(stormConf.get(LOGVIEWER_APPENDER_NAME))); + + ResourceAuthorizer resourceAuthorizer = new ResourceAuthorizer(stormConf); + + LogviewerLogPageHandler logviewer = new LogviewerLogPageHandler(logRoot, daemonLogRoot, resourceAuthorizer); + LogviewerProfileHandler profileHandler = new LogviewerProfileHandler(logRoot, resourceAuthorizer); + LogviewerLogDownloadHandler logDownloadHandler = new LogviewerLogDownloadHandler(logRoot, daemonLogRoot, + resourceAuthorizer); + LogviewerLogSearchHandler logSearchHandler = new LogviewerLogSearchHandler(stormConf, logRoot, daemonLogRoot, + resourceAuthorizer); + IHttpCredentialsPlugin httpCredsHandler = AuthUtils.GetUiHttpCredentialsPlugin(stormConf); + + singletons.add(new LogviewerResource(logviewer, profileHandler, logDownloadHandler, logSearchHandler, httpCredsHandler)); + singletons.add(new AuthorizationExceptionMapper()); + } + + @Override + public Set<Object> getSingletons() { + return singletons; + } + + public static void setup(Map<String, Object> stormConf) { + LogviewerApplication.stormConf = stormConf; + } + + /** + * Given an appender name, as configured, get the parent directory of the appender's log file. + * Note that if anything goes wrong, this will throw an Error and exit. + */ + private String logRootDir(String appenderName) { + Appender appender = ((LoggerContext) LogManager.getContext()).getConfiguration().getAppender(appenderName); + if (appenderName != null && appender != null && RollingFileAppender.class.isInstance(appender)) { + return new File(((RollingFileAppender) appender).getFileName()).getParent(); + } else { + throw new RuntimeException("Log viewer could not find configured appender, or the appender is not a FileAppender. " + + "Please check that the appender name configured in storm and log4j agree."); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java new file mode 100644 index 0000000..fd8ea97 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.wip.logviewer.webapp; + +import com.codahale.metrics.Meter; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.daemon.common.JsonResponseBuilder; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler; +import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.security.auth.IHttpCredentialsPlugin; +import org.apache.storm.ui.InvalidRequestException; +import org.apache.storm.ui.UIHelpers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.Map; + +@Path("/") +public class LogviewerResource { + private static final Logger LOG = LoggerFactory.getLogger(LogviewerResource.class); + + private static final Meter meterLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-log-page-http-requests"); + private static final Meter meterDaemonLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-daemonlog-page-http-requests"); + private static final Meter meterDownloadLogFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-file-http-requests"); + private static final Meter meterDownloadLogDaemonFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-daemon-file-http-requests"); + private static final Meter meterListLogsHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests"); + + private final LogviewerLogPageHandler logviewer; + private final LogviewerProfileHandler profileHandler; + private final LogviewerLogDownloadHandler logDownloadHandler; + private final LogviewerLogSearchHandler logSearchHandler; + private final IHttpCredentialsPlugin httpCredsHandler; + + public LogviewerResource(LogviewerLogPageHandler logviewerParam, LogviewerProfileHandler profileHandler, + LogviewerLogDownloadHandler logDownloadHandler, LogviewerLogSearchHandler logSearchHandler, + IHttpCredentialsPlugin httpCredsHandler) { + this.logviewer = logviewerParam; + this.profileHandler = profileHandler; + this.logDownloadHandler = logDownloadHandler; + this.logSearchHandler = logSearchHandler; + this.httpCredsHandler = httpCredsHandler; + } + + @GET + @Path("/log") + public Response log(@Context HttpServletRequest request) throws IOException { + meterLogPageHttpRequests.mark(); + + try { + String user = httpCredsHandler.getUserName(request); + Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; + Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; + String decodedFileName = URLDecoder.decode(request.getParameter("file")); + String grep = request.getParameter("grep"); + return logviewer.logPage(decodedFileName, start, length, grep, user); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return Response.status(400).entity(e.getMessage()).build(); + } + } + + @GET + @Path("/daemonlog") + public Response daemonLog(@Context HttpServletRequest request) throws IOException { + meterDaemonLogPageHttpRequests.mark(); + + try { + String user = httpCredsHandler.getUserName(request); + Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; + Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; + String decodedFileName = URLDecoder.decode(request.getParameter("file")); + String grep = request.getParameter("grep"); + return logviewer.daemonLogPage(decodedFileName, start, length, grep, user); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return Response.status(400).entity(e.getMessage()).build(); + } + } + + @GET + @Path("/searchLogs") + public Response searchLogs(@Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + String topologyId = request.getParameter("topoId"); + String portStr = request.getParameter("port"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + } + + @GET + @Path("/listLogs") + public Response listLogs(@Context HttpServletRequest request) throws IOException { + meterListLogsHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String topologyId = request.getParameter("topoId"); + String portStr = request.getParameter("port"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + } + + @GET + @Path("/dumps/{topo-id}/{host-port}") + public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, + @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + return profileHandler.listDumpFiles(topologyId, hostPort, user); + } + + @GET + @Path("/dumps/{topo-id}/{host-port}/{filename}") + public Response downloadDumpFile(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, + @PathParam("filename") String fileName, @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user); + } + + @GET + @Path("/download") + public Response downloadLogFile(@Context HttpServletRequest request) throws IOException { + meterDownloadLogFileHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String file = request.getParameter("file"); + String decodedFileName = URLDecoder.decode(file); + return logDownloadHandler.downloadLogFile(decodedFileName, user); + } + + @GET + @Path("/daemondownload") + public Response downloadDaemonLogFile(@Context HttpServletRequest request) throws IOException { + meterDownloadLogDaemonFileHttpRequests.mark(); + + String user = httpCredsHandler.getUserName(request); + String file = request.getParameter("file"); + String decodedFileName = URLDecoder.decode(file); + return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user); + } + + @GET + @Path("/search/{file}") + public Response search(@PathParam("file") String file, @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + boolean isDaemon = StringUtils.equals(request.getParameter("is-daemon"), "yes"); + String decodedFileName = URLDecoder.decode(file); + String searchString = request.getParameter("search-string"); + String numMatchesStr = request.getParameter("num-matches"); + String startByteOffset = request.getParameter("start-byte-offset"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + try { + return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, searchString, numMatchesStr, + startByteOffset, callback, origin); + } catch (InvalidRequestException e) { + LOG.error(e.getMessage(), e); + return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e)).setCallback(callback) + .setStatus(400).build(); + } + } + + @GET + @Path("/deepSearch/{topoId}") + public Response deepSearch(@PathParam("topoId") String topologyId, + @Context HttpServletRequest request) throws IOException { + String user = httpCredsHandler.getUserName(request); + String searchString = request.getParameter("search-string"); + String numMatchesStr = request.getParameter("num-matches"); + String portStr = request.getParameter("port"); + String startFileOffset = request.getParameter("start-file-offset"); + String startByteOffset = request.getParameter("start-byte-offset"); + String searchArchived = request.getParameter("search-archived"); + String callback = request.getParameter("callback"); + String origin = request.getHeader("Origin"); + + return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, + startFileOffset, startByteOffset, BooleanUtils.toBooleanObject(searchArchived), callback, origin); + } + + private int parseIntegerFromMap(Map map, String parameterKey) throws InvalidRequestException { + try { + return Integer.parseInt(((String[]) map.get(parameterKey))[0]); + } catch (NumberFormatException ex) { + throw new InvalidRequestException("Could not make an integer out of the query parameter '" + + parameterKey + "'", ex); + } + } + +}
