http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java
new file mode 100644
index 0000000..c0a69cb
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerLogSearchHandler.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.handler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.DirectoryCleaner;
+import org.apache.storm.daemon.common.JsonResponseBuilder;
+import org.apache.storm.daemon.utils.StreamUtil;
+import org.apache.storm.daemon.utils.URLBuilder;
+import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder;
+import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.daemon.wip.logviewer.utils.WorkerLogs;
+import org.apache.storm.ui.InvalidRequestException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.drop;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.first;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.last;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast;
+import static 
org.apache.storm.daemon.wip.logviewer.LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
+
+public class LogviewerLogSearchHandler {
+    private final static Logger LOG = 
LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
+
+    public static final int GREP_MAX_SEARCH_SIZE = 1024;
+    public static final int GREP_BUF_SIZE = 2048;
+    public static final int GREP_CONTEXT_SIZE = 128;
+    public static final Pattern WORKER_LOG_FILENAME_PATTERN = 
Pattern.compile("^worker.log(.*)");
+
+    private final Map<String, Object> stormConf;
+    private final String logRoot;
+    private final String daemonLogRoot;
+    private final ResourceAuthorizer resourceAuthorizer;
+    private final Integer logviewerPort;
+
+    public LogviewerLogSearchHandler(Map<String, Object> stormConf, String 
logRoot, String daemonLogRoot,
+                                     ResourceAuthorizer resourceAuthorizer) {
+        this.stormConf = stormConf;
+        this.logRoot = logRoot;
+        this.daemonLogRoot = daemonLogRoot;
+        this.resourceAuthorizer = resourceAuthorizer;
+
+        this.logviewerPort = 
ObjectReader.getInt(stormConf.get(DaemonConfig.LOGVIEWER_PORT));
+    }
+
+    public Response searchLogFile(String fileName, String user, boolean 
isDaemon, String search,
+                                  String numMatchesStr, String offsetStr, 
String callback, String origin)
+            throws IOException, InvalidRequestException {
+        String rootDir = isDaemon ? daemonLogRoot : logRoot;
+        File file = new File(rootDir, fileName).getCanonicalFile();
+        Response response;
+        if (file.exists()) {
+            if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, 
fileName)) {
+                Integer numMatchesInt = numMatchesStr != null ? 
tryParseIntParam("num-matches", numMatchesStr) : null;
+                Integer offsetInt = offsetStr != null ? 
tryParseIntParam("start-byte-offset", offsetStr) : null;
+
+                try {
+                    if (StringUtils.isNotEmpty(search) && 
search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) {
+                        Map<String, Object> entity = new HashMap<>();
+                        entity.put("isDaemon", isDaemon ? "yes" : "no");
+                        entity.putAll(substringSearch(file, search, isDaemon, 
numMatchesInt, offsetInt));
+
+                        response = 
LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin);
+                    } else {
+                        throw new InvalidRequestException("Search substring 
must be between 1 and 1024 "
+                                + "UTF-8 bytes in size (inclusive)");
+                    }
+                } catch (Exception ex) {
+                    response = 
LogviewerResponseBuilder.buildExceptionJsonResponse(ex, callback);
+                }
+            } else {
+                // unauthorized
+                response = 
LogviewerResponseBuilder.buildUnauthorizedUserJsonResponse(user, callback);
+            }
+        } else {
+            // not found
+            Map<String, String> entity = new HashMap<>();
+            entity.put("error", "Not Found");
+            entity.put("errorMessage", "The file was not found on this node.");
+
+            response = new 
JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build();
+        }
+
+        return response;
+    }
+
+    public Response deepSearchLogsForTopology(String topologyId, String user, 
String search,
+                                              String numMatchesStr, String 
portStr, String fileOffsetStr, String offsetStr,
+                                              Boolean searchArchived, String 
callback, String origin) {
+        String rootDir = logRoot;
+        Object returnValue;
+        File topologyDir = new File(rootDir + Utils.FILE_PATH_SEPARATOR + 
topologyId);
+        if (StringUtils.isEmpty(search) || !topologyDir.exists()) {
+            returnValue = new ArrayList<>();
+        } else {
+            int fileOffset = ObjectReader.getInt(fileOffsetStr, 0);
+            int offset = ObjectReader.getInt(offsetStr, 0);
+            int numMatches = ObjectReader.getInt(numMatchesStr, 1);
+
+            File[] portDirsArray = topologyDir.listFiles();
+            List<File> portDirs;
+            if (portDirsArray != null) {
+                portDirs = Arrays.asList(portDirsArray);
+            } else {
+                portDirs = new ArrayList<>();
+            }
+
+            if (StringUtils.isEmpty(portStr) || portStr.equals("*")) {
+                // check for all ports
+                List<List<File>> filteredLogs = portDirs.stream()
+                        .map(portDir -> logsForPort(user, portDir))
+                        .filter(logs -> logs != null && !logs.isEmpty())
+                        .collect(toList());
+
+                if (BooleanUtils.isTrue(searchArchived)) {
+                    returnValue = filteredLogs.stream()
+                            .map(fl -> findNMatches(fl, numMatches, 0, 0, 
search))
+                            .collect(toList());
+                } else {
+                    returnValue = filteredLogs.stream()
+                            .map(fl -> Collections.singletonList(first(fl)))
+                            .map(fl -> findNMatches(fl, numMatches, 0, 0, 
search))
+                            .collect(toList());
+                }
+            } else {
+                int port = Integer.parseInt(portStr);
+                // check just the one port
+                List<Integer> slotsPorts = (List<Integer>) 
stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
+                        new ArrayList<>());
+                boolean containsPort = slotsPorts.stream()
+                        .anyMatch(slotPort -> slotPort != null && (slotPort == 
port));
+                if (!containsPort) {
+                    returnValue = new ArrayList<>();
+                } else {
+                    File portDir = new File(rootDir + 
Utils.FILE_PATH_SEPARATOR + topologyId +
+                            Utils.FILE_PATH_SEPARATOR + port);
+
+                    if (!portDir.exists() || logsForPort(user, 
portDir).isEmpty()) {
+                        returnValue = new ArrayList<>();
+                    } else {
+                        List<File> filteredLogs = logsForPort(user, portDir);
+                        if (BooleanUtils.isTrue(searchArchived)) {
+                            returnValue = findNMatches(filteredLogs, 
numMatches, fileOffset, offset, search);
+                        } else {
+                            returnValue = 
findNMatches(Collections.singletonList(first(filteredLogs)),
+                                    numMatches, 0, offset, search);
+                        }
+                    }
+                }
+            }
+        }
+
+        return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, 
callback, origin);
+    }
+
+    private Integer tryParseIntParam(String paramName, String value) throws 
InvalidRequestException {
+        try {
+            return Integer.parseInt(value);
+        } catch (NumberFormatException e) {
+            throw new InvalidRequestException("Could not parse " + paramName + 
" to an integer");
+        }
+    }
+
+    private Map<String,Object> substringSearch(File file, String searchString, 
int numMatches, int startByteOffset) throws InvalidRequestException {
+        return substringSearch(file, searchString, false, numMatches, 
startByteOffset);
+    }
+
+    private Map<String,Object> substringSearch(File file, String searchString, 
boolean isDaemon, Integer numMatches, Integer startByteOffset) throws 
InvalidRequestException {
+        try {
+            if (StringUtils.isEmpty(searchString)) {
+                throw new IllegalArgumentException("Precondition fails: search 
string should not be empty.");
+            }
+            if (searchString.getBytes("UTF-8").length > GREP_MAX_SEARCH_SIZE) {
+                throw new IllegalArgumentException("Precondition fails: the 
length of search string should be less than " + GREP_MAX_SEARCH_SIZE);
+            }
+
+            boolean isZipFile = file.getName().endsWith(".gz");
+            FileInputStream fis = new FileInputStream(file);
+            InputStream gzippedInputStream;
+            if (isZipFile) {
+                gzippedInputStream = new GZIPInputStream(fis);
+            } else {
+                gzippedInputStream = fis;
+            }
+
+            BufferedInputStream stream = new 
BufferedInputStream(gzippedInputStream);
+
+            int fileLength;
+            if (isZipFile) {
+                fileLength = (int) ServerUtils.zipFileSize(file);
+            } else {
+                fileLength = (int) file.length();
+            }
+
+            ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE);
+            byte[] bufArray = buf.array();
+            int totalBytesRead = 0;
+            byte[] searchBytes = searchString.getBytes("UTF-8");
+            numMatches = numMatches != null ? numMatches : 10;
+            startByteOffset = startByteOffset != null ? startByteOffset : 0;
+
+            // Start at the part of the log file we are interested in.
+            // Allow searching when start-byte-offset == file-len so it 
doesn't blow up on 0-length files
+            if (startByteOffset > fileLength) {
+                throw new InvalidRequestException("Cannot search past the end 
of the file");
+            }
+
+            if (startByteOffset > 0) {
+                StreamUtil.skipBytes(stream, startByteOffset);
+            }
+
+            Arrays.fill(bufArray, (byte) 0);
+
+            int bytesRead = stream.read(bufArray, 0, Math.min((int) 
fileLength, GREP_BUF_SIZE));
+            buf.limit(bytesRead);
+            totalBytesRead += bytesRead;
+
+            List<Map<String, Object>> initialMatches = new ArrayList<>();
+            int initBufOffset = 0;
+            int byteOffset = startByteOffset;
+            byte[] beforeBytes = null;
+
+            Map<String, Object> ret = new HashMap<>();
+            while (true) {
+                SubstringSearchResult searchRet = 
bufferSubstringSearch(isDaemon, file, fileLength, byteOffset, initBufOffset,
+                        stream, startByteOffset, totalBytesRead, buf, 
searchBytes, initialMatches, numMatches, beforeBytes);
+
+                List<Map<String, Object>> matches = searchRet.getMatches();
+                Integer newByteOffset = searchRet.getNewByteOffset();
+                byte[] newBeforeBytes = searchRet.getNewBeforeBytes();
+
+                if (matches.size() < numMatches && totalBytesRead + 
startByteOffset < fileLength) {
+                    // The start index is positioned to find any possible
+                    // occurrence search string that did not quite fit in the
+                    // buffer on the previous read.
+                    int newBufOffset = Math.min(buf.limit(), 
GREP_MAX_SEARCH_SIZE) - searchBytes.length;
+
+                    totalBytesRead = rotateGrepBuffer(buf, stream, 
totalBytesRead, file, fileLength);
+                    if (totalBytesRead < 0) {
+                        throw new InvalidRequestException("Cannot search past 
the end of the file");
+                    }
+
+                    initialMatches = matches;
+                    initBufOffset = newBufOffset;
+                    byteOffset = newByteOffset;
+                    beforeBytes = newBeforeBytes;
+                } else {
+                    ret.put("isDaemon", isDaemon ? "yes" : "no");
+                    Integer nextByteOffset = null;
+                    if (matches.size() >= numMatches || totalBytesRead < 
fileLength) {
+                        nextByteOffset = (Integer) 
last(matches).get("byteOffset") + searchBytes.length;
+                        if (fileLength <= nextByteOffset) {
+                            nextByteOffset = null;
+                        }
+                    }
+                    ret.putAll(mkGrepResponse(searchBytes, startByteOffset, 
matches, nextByteOffset));
+                    break;
+                }
+            }
+
+            return ret;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    /**
+     * Get the filtered, authorized, sorted log files for a port.
+     */
+    private List<File> logsForPort(String user, File portDir) {
+        try {
+            List<File> workerLogs = 
DirectoryCleaner.getFilesForDir(portDir).stream()
+                    .filter(file -> 
WORKER_LOG_FILENAME_PATTERN.asPredicate().test(file.getName()))
+                    .collect(toList());
+
+            return workerLogs.stream()
+                    .filter(log -> 
resourceAuthorizer.isUserAllowedToAccessFile(user, 
WorkerLogs.getTopologyPortWorkerLog(log)))
+                    .sorted((f1, f2) -> (int) (f2.lastModified() - 
f1.lastModified()))
+                    .collect(toList());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Matched findNMatches(List<File> logs, int numMatches, int 
fileOffset, int offset, String search) {
+        logs = drop(logs, fileOffset);
+
+        List<Map<String, Object>> matches = new ArrayList<>();
+        int matchCount = 0;
+
+        while (true) {
+            if (logs.isEmpty()) {
+                break;
+            }
+
+            File firstLog = logs.get(0);
+            Map<String, Object> theseMatches;
+            try {
+                LOG.debug("Looking through {}", firstLog);
+                theseMatches = substringSearch(firstLog, search, numMatches - 
matchCount, offset);
+            } catch (InvalidRequestException e) {
+                LOG.error("Can't search past end of file.", e);
+                theseMatches = new HashMap<>();
+            }
+
+            String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog);
+
+            List<Map<String, Object>> newMatches = new ArrayList<>(matches);
+            Map<String, Object> currentFileMatch = new HashMap<>(theseMatches);
+            currentFileMatch.put("fileName", fileName);
+            List<String> splitPath;
+            try {
+                splitPath = 
Arrays.asList(firstLog.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            currentFileMatch.put("port", first(takeLast(splitPath, 2)));
+            newMatches.add(currentFileMatch);
+
+            int newCount = matchCount + 
((List<?>)theseMatches.get("matches")).size();
+
+            if (theseMatches.isEmpty()) {
+                // matches and matchCount is not changed
+                logs = rest(logs);
+                offset = 0;
+                fileOffset = fileOffset + 1;
+            } else if (newCount >= numMatches) {
+                matches = newMatches;
+                break;
+            } else {
+                matches = newMatches;
+                logs = rest(logs);
+                offset = 0;
+                fileOffset = fileOffset + 1;
+                matchCount = newCount;
+            }
+        }
+
+        return new Matched(fileOffset, search, matches);
+    }
+
+
+    /**
+     * As the file is read into a buffer, 1/2 the buffer's size at a time, we 
search the buffer for matches of the
+     * substring and return a list of zero or more matches.
+     */
+    private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, File 
file, int fileLength, int offsetToBuf,
+                                                        int initBufOffset, 
BufferedInputStream stream, Integer bytesSkipped,
+                                                        int bytesRead, 
ByteBuffer haystack, byte[] needle,
+                                                        List<Map<String, 
Object>> initialMatches, Integer numMatches, byte[] beforeBytes) throws 
IOException {
+        int bufOffset = initBufOffset;
+        List<Map<String, Object>> matches = initialMatches;
+
+        byte[] newBeforeBytes;
+        Integer newByteOffset;
+
+        while (true) {
+            int offset = offsetOfBytes(haystack.array(), needle, bufOffset);
+            if (matches.size() < numMatches && offset >= 0) {
+                int fileOffset = offsetToBuf + offset;
+                int bytesNeededAfterMatch = haystack.limit() - 
GREP_CONTEXT_SIZE - needle.length;
+
+                byte[] beforeArg = null;
+                byte[] afterArg = null;
+                if (offset < GREP_CONTEXT_SIZE) {
+                    beforeArg = beforeBytes;
+                }
+
+                if (offset > bytesNeededAfterMatch) {
+                    afterArg = tryReadAhead(stream, haystack, offset, 
fileLength, bytesRead);
+                }
+
+                bufOffset = offset + needle.length;
+                matches.add(mkMatchData(needle, haystack, offset, fileOffset,
+                        file.getCanonicalPath(), isDaemon, beforeArg, 
afterArg));
+            } else {
+                int beforeStrToOffset = Math.min(haystack.limit(), 
GREP_MAX_SEARCH_SIZE);
+                int beforeStrFromOffset = Math.max(0, beforeStrToOffset - 
GREP_CONTEXT_SIZE);
+                newBeforeBytes = Arrays.copyOfRange(haystack.array(), 
beforeStrFromOffset, beforeStrToOffset);
+
+                // It's OK if new-byte-offset is negative.
+                // This is normal if we are out of bytes to read from a small 
file.
+                if (matches.size() >= numMatches) {
+                    newByteOffset = ((Number) 
last(matches).get("byteOffset")).intValue() + needle.length;
+                } else {
+                    newByteOffset = bytesSkipped + bytesRead - 
GREP_MAX_SEARCH_SIZE;
+                }
+
+                break;
+            }
+        }
+
+        return new SubstringSearchResult(matches, newByteOffset, 
newBeforeBytes);
+    }
+
+    private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, 
int totalBytesRead, File file,
+                                 int fileLength) throws IOException {
+        byte[] bufArray = buf.array();
+
+        // Copy the 2nd half of the buffer to the first half.
+        System.arraycopy(bufArray, GREP_MAX_SEARCH_SIZE, bufArray, 0, 
GREP_MAX_SEARCH_SIZE);
+
+        // Zero-out the 2nd half to prevent accidental matches.
+        Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0);
+
+        // Fill the 2nd half with new bytes from the stream.
+        int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, 
Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE));
+        buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead);
+        return totalBytesRead + bytesRead;
+    }
+
+
+    private Map<String, Object> mkMatchData(byte[] needle, ByteBuffer 
haystack, int haystackOffset, int fileOffset, String fname,
+                                            boolean isDaemon, byte[] 
beforeBytes, byte[] afterBytes)
+            throws UnsupportedEncodingException, UnknownHostException {
+        String url;
+        if (isDaemon) {
+            url = urlToMatchCenteredInLogPageDaemonFile(needle, fname, 
fileOffset, logviewerPort);
+        } else {
+            url = urlToMatchCenteredInLogPage(needle, fname, fileOffset, 
logviewerPort);
+        }
+
+        byte[] haystackBytes = haystack.array();
+        String beforeString;
+        String afterString;
+
+        if (haystackOffset >= GREP_CONTEXT_SIZE) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(new String(haystackBytes, (haystackOffset - 
GREP_CONTEXT_SIZE), GREP_CONTEXT_SIZE, "UTF-8"));
+            sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8"));
+            beforeString = sb.toString();
+        } else {
+            int numDesired = Math.max(0, GREP_CONTEXT_SIZE - haystackOffset);
+            int beforeSize = beforeBytes != null ? beforeBytes.length : 0;
+            int numExpected = Math.min(beforeSize, numDesired);
+
+            if (numExpected > 0) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(new String(beforeBytes, numExpected - beforeSize, 
numExpected, "UTF-8"));
+                sb.append(new String(haystackBytes, 0, haystackOffset, 
"UTF-8"));
+                beforeString = sb.toString();
+            } else {
+                beforeString = new String(haystackBytes, 0, haystackOffset, 
"UTF-8");
+            }
+        }
+
+        int needleSize = needle.length;
+        int afterOffset = haystackOffset + needleSize;
+        int haystackSize = haystack.limit();
+
+        if ((afterOffset + GREP_CONTEXT_SIZE) < haystackSize) {
+            afterString = new String(haystackBytes, afterOffset, 
GREP_CONTEXT_SIZE, "UTF-8");
+        } else {
+            int numDesired = GREP_CONTEXT_SIZE - (haystackSize - afterOffset);
+            int afterSize = afterBytes != null ? afterBytes.length : 0;
+            int numExpected = Math.min(afterSize, numDesired);
+
+            if (numExpected > 0) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(new String(haystackBytes, afterOffset, (haystackSize 
- afterOffset), "UTF-8"));
+                sb.append(new String(afterBytes, 0, numExpected, "UTF-8"));
+                afterString = sb.toString();
+            } else {
+                afterString = new String(haystackBytes, afterOffset, 
(haystackSize - afterOffset), "UTF-8");
+            }
+        }
+
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("byteOffset", fileOffset);
+        ret.put("beforeString", beforeString);
+        ret.put("afterString", afterString);
+        ret.put("matchString", new String(needle, "UTF-8"));
+        ret.put("logviewerURL", url);
+
+        return ret;
+    }
+
+    /**
+     * Tries once to read ahead in the stream to fill the context and
+     * resets the stream to its position before the call.
+     */
+    private byte[] tryReadAhead(BufferedInputStream stream, ByteBuffer 
haystack, int offset, int fileLength, int bytesRead) throws IOException {
+        int numExpected = Math.min(fileLength - bytesRead, GREP_CONTEXT_SIZE);
+        byte[] afterBytes = new byte[numExpected];
+        stream.mark(numExpected);
+        // Only try reading once.
+        stream.read(afterBytes, 0, numExpected);
+        stream.reset();
+        return afterBytes;
+    }
+
+    /**
+     * Searches a given byte array for a match of a sub-array of bytes.
+     * Returns the offset to the byte that matches, or -1 if no match was 
found.
+     */
+    private int offsetOfBytes(byte[] buffer, byte[] search, int initOffset) {
+        if (search.length <= 0) {
+            throw new IllegalArgumentException("Search array should not be 
empty.");
+        }
+
+        if (initOffset < 0) {
+            throw new IllegalArgumentException("Start offset shouldn't be 
negative.");
+        }
+
+        int offset = initOffset;
+        int candidateOffset = initOffset;
+        int valOffset = 0;
+        int retOffset = 0;
+
+        while (true) {
+            if (search.length - valOffset <= 0) {
+                // found
+                retOffset = candidateOffset;
+                break;
+            } else {
+                if (offset >= buffer.length) {
+                    // We ran out of buffer for the search.
+                    retOffset = -1;
+                    break;
+                } else {
+                    if (search[valOffset] != buffer[offset]) {
+                        // The match at this candidate offset failed, so start 
over with the
+                        // next candidate byte from the buffer.
+                        int newOffset = candidateOffset + 1;
+
+                        offset = newOffset;
+                        candidateOffset = newOffset;
+                        valOffset = 0;
+                    } else {
+                        // So far it matches.  Keep going...
+                        offset = offset + 1;
+                        valOffset = valOffset + 1;
+                    }
+                }
+            }
+        }
+
+        return retOffset;
+    }
+
+    /**
+     * This response data only includes a next byte offset if there is more of 
the file to read.
+     */
+    private Map<String, Object> mkGrepResponse(byte[] searchBytes, Integer 
offset, List<Map<String, Object>> matches,
+                                               Integer nextByteOffset) throws 
UnsupportedEncodingException {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("searchString", new String(searchBytes, "UTF-8"));
+        ret.put("startByteOffset", offset);
+        ret.put("matches", matches);
+        if (nextByteOffset != null) {
+            ret.put("nextByteOffset", nextByteOffset);
+        }
+        return ret;
+    }
+
+    private String urlToMatchCenteredInLogPage(byte[] needle, String fname, 
int offset, Integer port) throws UnknownHostException {
+        String host = Utils.hostname();
+        String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR,
+                
takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 3));
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("file", splittedFileName);
+        parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 
2) - (needle.length / -2)));
+        parameters.put("length", DEFAULT_BYTES_PER_PAGE);
+
+        return URLBuilder.build(String.format("http://%s:%d/api/v1/log";, host, 
port), parameters);
+    }
+
+    private String urlToMatchCenteredInLogPageDaemonFile(byte[] needle, String 
fname, int offset, Integer port) throws UnknownHostException {
+        String host = Utils.hostname();
+        String splittedFileName = String.join(Utils.FILE_PATH_SEPARATOR,
+                
takeLast(Arrays.asList(fname.split(Utils.FILE_PATH_SEPARATOR)), 1));
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("file", splittedFileName);
+        parameters.put("start", Math.max(0, offset - (DEFAULT_BYTES_PER_PAGE / 
2) - (needle.length / -2)));
+        parameters.put("length", DEFAULT_BYTES_PER_PAGE);
+
+        return URLBuilder.build(String.format("http://%s:%d/api/v1/daemonlog";, 
host, port), parameters);
+    }
+
+    private static class Matched implements JSONAware {
+        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+        private int fileOffset;
+        private String searchString;
+        private List<Map<String, Object>> matches;
+
+        public Matched(int fileOffset, String searchString, List<Map<String, 
Object>> matches) {
+            this.fileOffset = fileOffset;
+            this.searchString = searchString;
+            this.matches = matches;
+        }
+
+        public int getFileOffset() {
+            return fileOffset;
+        }
+
+        public String getSearchString() {
+            return searchString;
+        }
+
+        public List<Map<String, Object>> getMatches() {
+            return matches;
+        }
+
+        @Override
+        public String toJSONString() {
+            try {
+                return OBJECT_MAPPER.writeValueAsString(this);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static class SubstringSearchResult {
+        private List<Map<String, Object>> matches;
+        private Integer newByteOffset;
+        private byte[] newBeforeBytes;
+
+        public SubstringSearchResult(List<Map<String, Object>> matches, 
Integer newByteOffset, byte[] newBeforeBytes) {
+            this.matches = matches;
+            this.newByteOffset = newByteOffset;
+            this.newBeforeBytes = newBeforeBytes;
+        }
+
+        public List<Map<String, Object>> getMatches() {
+            return matches;
+        }
+
+        public Integer getNewByteOffset() {
+            return newByteOffset;
+        }
+
+        public byte[] getNewBeforeBytes() {
+            return newBeforeBytes;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java
new file mode 100644
index 0000000..1cc274c
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/handler/LogviewerProfileHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.handler;
+
+import j2html.tags.DomContent;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.daemon.DirectoryCleaner;
+import org.apache.storm.daemon.wip.logviewer.utils.LogviewerResponseBuilder;
+import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.utils.ServerUtils;
+
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static j2html.TagCreator.a;
+import static j2html.TagCreator.body;
+import static j2html.TagCreator.head;
+import static j2html.TagCreator.html;
+import static j2html.TagCreator.li;
+import static j2html.TagCreator.link;
+import static j2html.TagCreator.title;
+import static j2html.TagCreator.ul;
+import static java.util.stream.Collectors.toList;
+
+public class LogviewerProfileHandler {
+
+    public static final String WORKER_LOG_FILENAME = "worker.log";
+    private final String logRoot;
+    private final ResourceAuthorizer resourceAuthorizer;
+
+    public LogviewerProfileHandler(String logRoot, ResourceAuthorizer 
resourceAuthorizer) {
+        this.logRoot = logRoot;
+        this.resourceAuthorizer = resourceAuthorizer;
+    }
+
+    public Response listDumpFiles(String topologyId, String hostPort, String 
user) throws IOException {
+        String portStr = hostPort.split(":")[1];
+        File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, 
logRoot, topologyId, portStr));
+
+        if (dir.exists()) {
+            String workerFileRelativePath = 
String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, 
WORKER_LOG_FILENAME);
+            if (resourceAuthorizer.isUserAllowedToAccessFile(user, 
workerFileRelativePath)) {
+                String content = buildDumpFileListPage(topologyId, hostPort, 
dir);
+                return 
LogviewerResponseBuilder.buildSuccessHtmlResponse(content);
+            } else {
+                return 
LogviewerResponseBuilder.buildResponseUnautohrizedUser(user);
+            }
+        } else {
+            return LogviewerResponseBuilder.buildResponsePageNotFound();
+        }
+    }
+
+    private String buildDumpFileListPage(String topologyId, String hostPort, 
File dir) throws IOException {
+        List<DomContent> liTags = getProfilerDumpFiles(dir).stream()
+                .map(file -> li(a(file).withHref("/api/v1/dumps/" + topologyId 
+ "/" + hostPort + "/" + file)))
+                .collect(toList());
+
+        return html(
+                head(
+                        title("File Dumps - Storm Log Viewer"),
+                        
link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"),
+                        
link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"),
+                        link().withRel("stylesheet").withHref("/css/style.css")
+                ),
+                body(
+                        ul(liTags.toArray(new DomContent[]{}))
+                )
+        ).render();
+    }
+
+    public Response downloadDumpFile(String topologyId, String hostPort, 
String fileName, String user) throws IOException {
+        String portStr = hostPort.split(":")[1];
+        File dir = new File(String.join(ServerUtils.FILE_PATH_SEPARATOR, 
logRoot, topologyId, portStr));
+        File file = new File(dir, fileName);
+
+        if (dir.exists() && file.exists()) {
+            String workerFileRelativePath = 
String.join(ServerUtils.FILE_PATH_SEPARATOR, topologyId, portStr, 
WORKER_LOG_FILENAME);
+            if (resourceAuthorizer.isUserAllowedToAccessFile(user, 
workerFileRelativePath)) {
+                return LogviewerResponseBuilder.buildDownloadFile(file);
+            } else {
+                return 
LogviewerResponseBuilder.buildResponseUnautohrizedUser(user);
+            }
+        } else {
+            return LogviewerResponseBuilder.buildResponsePageNotFound();
+        }
+    }
+
+    private List<String> getProfilerDumpFiles(File dir) throws IOException {
+        List<File> filesForDir = DirectoryCleaner.getFilesForDir(dir);
+        return filesForDir.stream().filter(file -> {
+            String fileName = file.getName();
+            return StringUtils.isNotEmpty(fileName)
+                    && (fileName.endsWith(".txt") || fileName.endsWith(".jfr") 
|| fileName.endsWith(".bin"));
+        }).map(File::getName).collect(toList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java
new file mode 100644
index 0000000..6aa1050
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.DirectoryCleaner;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.jooq.lambda.Unchecked;
+import org.jooq.lambda.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.BinaryOperator;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS;
+import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS;
+import static 
org.apache.storm.DaemonConfig.LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB;
+import static 
org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB;
+
+public class LogCleaner implements Runnable, Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);
+    public static final String WORKER_YAML = "worker.yaml";
+
+    private final Map<String, Object> stormConf;
+    private final Integer intervalSecs;
+    private final String logRootDir;
+    private StormTimer logviewerCleanupTimer;
+    private final long maxSumWorkerLogsSizeMb;
+    private long maxPerWorkerLogsSizeMb;
+
+    public LogCleaner(Map<String, Object> stormConf) {
+        String logRootDir = ConfigUtils.workerArtifactsRoot(stormConf);
+
+        this.stormConf = stormConf;
+        this.intervalSecs = 
ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_INTERVAL_SECS), null);
+        this.logRootDir = logRootDir;
+
+        maxSumWorkerLogsSizeMb = 
ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB));
+        maxPerWorkerLogsSizeMb = 
ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB));
+        maxPerWorkerLogsSizeMb = Math.min(maxPerWorkerLogsSizeMb, (long) 
(maxSumWorkerLogsSizeMb * 0.5));
+
+        LOG.info("configured max total size of worker logs: {} MB, max total 
size of worker logs per directory: {} MB",
+                maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb);
+    }
+
+    public void start() {
+        if (intervalSecs != null) {
+            LOG.debug("starting log cleanup thread at interval: {}", 
intervalSecs);
+
+            logviewerCleanupTimer = new StormTimer("logviewer-cleanup", (t, e) 
-> {
+                LOG.error("Error when doing logs cleanup", e);
+                Utils.exitProcess(20, "Error when doing log cleanup");
+            });
+
+            logviewerCleanupTimer.scheduleRecurring(0, intervalSecs, this);
+        } else {
+            LOG.warn("The interval for log cleanup is not set. Skip starting 
log cleanup thread.");
+        }
+
+    }
+
+    public void close() {
+        if (logviewerCleanupTimer != null) {
+            try {
+                logviewerCleanupTimer.close();
+            } catch (Exception ex) {
+                throw Utils.wrapInRuntime(ex);
+            }
+        }
+    }
+
+    /**
+     * Delete old log dirs for which the workers are no longer alive.
+     */
+    public void run() {
+        try {
+            int nowSecs = Time.currentTimeSecs();
+            Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000, 
logRootDir);
+
+            DirectoryCleaner cleaner = new DirectoryCleaner();
+            SortedSet<File> deadWorkerDirs = getDeadWorkerDirs(nowSecs, 
oldLogDirs);
+
+            LOG.debug("log cleanup: now={} old log dirs {} dead worker dirs 
{}", nowSecs,
+                    
oldLogDirs.stream().map(File::getName).collect(joining(",")),
+                    
deadWorkerDirs.stream().map(File::getName).collect(joining(",")));
+
+            deadWorkerDirs.forEach(Unchecked.consumer(dir -> {
+                String path = dir.getCanonicalPath();
+                LOG.info("Cleaning up: Removing {}", path);
+
+                try {
+                    Utils.forceDelete(path);
+                    cleanupEmptyTopoDirectory(dir);
+                } catch (Exception ex) {
+                    LOG.error(ex.getMessage(), ex);
+                }
+            }));
+
+            perWorkerDirCleanup(new File(logRootDir), maxPerWorkerLogsSizeMb * 
1024 * 1024, cleaner);
+            globalLogCleanup(new File(logRootDir), maxSumWorkerLogsSizeMb * 
1024 * 1024, cleaner);
+        } catch (Exception ex) {
+            LOG.error("Exception while cleaning up old log.", ex);
+        }
+    }
+
+    /**
+     * Delete the oldest files in each overloaded worker log dir.
+     */
+    private void perWorkerDirCleanup(File rootDir, long size, DirectoryCleaner 
cleaner) {
+        WorkerLogs.getAllWorkerDirs(rootDir).forEach(Unchecked.consumer(dir -> 
{
+            cleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), 
size, true, null);
+        }));
+    }
+
+    /**
+     * Delete the oldest files in overloaded worker-artifacts globally.
+     */
+    private void globalLogCleanup(File rootDir, long size, DirectoryCleaner 
cleaner) throws Exception {
+        List<File> workerDirs = new 
ArrayList<>(WorkerLogs.getAllWorkerDirs(rootDir));
+        Set<String> aliveWorkerDirs = new 
HashSet<>(getAliveWorkerDirs(rootDir));
+
+        cleaner.deleteOldestWhileTooLarge(workerDirs, size, false, 
aliveWorkerDirs);
+    }
+
+    /**
+     * Return a sorted set of java.io.Files that were written by workers that 
are now active.
+     */
+    private SortedSet<String> getAliveWorkerDirs(File rootDir) throws 
Exception {
+        Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
+        Set<File> logDirs = WorkerLogs.getAllWorkerDirs(rootDir);
+        Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
+
+        return idToDir.entrySet().stream()
+                .filter(entry -> aliveIds.contains(entry.getKey()))
+                .map(Unchecked.function(entry -> 
entry.getValue().getCanonicalPath()))
+                .collect(toCollection(TreeSet::new));
+    }
+
+    private Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
+        return logDirs.stream().map(Unchecked.function(logDir -> {
+            Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir);
+
+            return metaFile.map(Unchecked.function(m -> new 
Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir)))
+                    .orElse(new Tuple2<>("", logDir));
+        })).collect(toMap(Tuple2::v1, Tuple2::v2));
+    }
+
+    private Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws 
IOException {
+        File metaFile = new File(logDir, WORKER_YAML);
+        if (metaFile.exists()) {
+            return Optional.of(metaFile);
+        } else {
+            LOG.warn("Could not find {} to clean up for {}", 
metaFile.getCanonicalPath(), logDir);
+            return Optional.empty();
+        }
+    }
+
+    private String getWorkerIdFromMetadataFile(String metaFile) {
+        Map<String, Object> map = (Map<String, Object>)  
Utils.readYamlFile(metaFile);
+        return ObjectReader.getString(map.get("worker-id"), null);
+    }
+
+    private Set<String> getAliveIds(int nowSecs) throws Exception {
+        return 
SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream()
+                .filter(entry -> Objects.nonNull(entry.getValue())
+                        && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, 
entry.getValue(), stormConf))
+                .map(Map.Entry::getKey)
+                .collect(toCollection(TreeSet::new));
+    }
+
+    /**
+     * Delete the topo dir if it contains zero port dirs.
+     */
+    private void cleanupEmptyTopoDirectory(File dir) throws IOException {
+        File topoDir = dir.getParentFile();
+        if (topoDir.listFiles().length == 0) {
+            Utils.forceDelete(topoDir.getCanonicalPath());
+        }
+    }
+
+    /**
+     * Return a sorted set of java.io.Files that were written by workers that 
are now dead.
+     */
+    private SortedSet<File> getDeadWorkerDirs(int nowSecs, Set<File> logDirs) 
throws Exception {
+        if (logDirs.isEmpty()) {
+            return new TreeSet<>();
+        } else {
+            Set<String> aliveIds = getAliveIds(nowSecs);
+            Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
+
+            return idToDir.entrySet().stream()
+                    .filter(entry -> !aliveIds.contains(entry.getKey()))
+                    .map(Map.Entry::getValue)
+                    .collect(toCollection(TreeSet::new));
+        }
+    }
+
+    private Set<File> selectDirsForCleanup(int nowMillis, String rootDir) {
+        FileFilter fileFilter = mkFileFilterForLogCleanup(nowMillis);
+
+        return Arrays.stream(new File(rootDir).listFiles())
+                .flatMap(topoDir -> 
Arrays.stream(topoDir.listFiles(fileFilter)))
+                .collect(toCollection(TreeSet::new));
+    }
+
+    private FileFilter mkFileFilterForLogCleanup(int nowMillis) {
+        final int cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis);
+        return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= 
cutoffAgeMillis;
+    }
+
+    /**
+     * Return the last modified time for all log files in a worker's log dir.
+     * Using stream rather than File.listFiles is to avoid large mem usage
+     * when a directory has too many files.
+     */
+    private long lastModifiedTimeWorkerLogdir(File logDir) {
+        Optional<DirectoryStream<Path>> dirStreamOptional = 
getStreamForDir(logDir);
+        long dirModified = logDir.lastModified();
+
+        if (!dirStreamOptional.isPresent()) {
+            return dirModified;
+        }
+
+        DirectoryStream<Path> dirStream = dirStreamOptional.get();
+        try {
+            return StreamSupport.stream(dirStream.spliterator(), false)
+                    .reduce(dirModified, (maximum, path) -> {
+                        long curr = path.toFile().lastModified();
+                        return curr > maximum ? curr : maximum;
+                    }, BinaryOperator.maxBy(Long::compareTo));
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            return dirModified;
+        } finally {
+            if (DirectoryStream.class.isInstance(dirStream)) {
+                IOUtils.closeQuietly(dirStream);
+            }
+        }
+    }
+
+    private Optional<DirectoryStream<Path>> getStreamForDir(File file) {
+        try {
+            return Optional.of(Files.newDirectoryStream(file.toPath()));
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            return Optional.empty();
+        }
+    }
+
+    private int cleanupCutoffAgeMillis(int nowMillis) {
+        return nowMillis - 
(ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_AGE_MINS)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java
new file mode 100644
index 0000000..dcb2e2c
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.utils;
+
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+
+public class LogFileDownloader {
+
+    private final String logRoot;
+    private final String daemonLogRoot;
+    private final ResourceAuthorizer resourceAuthorizer;
+
+    public LogFileDownloader(String logRoot, String daemonLogRoot, 
ResourceAuthorizer resourceAuthorizer) {
+        this.logRoot = logRoot;
+        this.daemonLogRoot = daemonLogRoot;
+        this.resourceAuthorizer = resourceAuthorizer;
+    }
+
+    public Response downloadFile(String fileName, String user, boolean 
isDaemon) throws IOException {
+        String rootDir = isDaemon ? daemonLogRoot : logRoot;
+        File file = new File(rootDir, fileName).getCanonicalFile();
+        if (file.exists()) {
+            if (isDaemon || 
resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) {
+                return LogviewerResponseBuilder.buildDownloadFile(file);
+            } else {
+                return 
LogviewerResponseBuilder.buildResponseUnautohrizedUser(user);
+            }
+        } else {
+            return LogviewerResponseBuilder.buildResponsePageNotFound();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java
new file mode 100644
index 0000000..bc60e09
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.utils;
+
+import com.google.common.io.ByteStreams;
+import org.apache.storm.daemon.common.JsonResponseBuilder;
+import org.apache.storm.ui.UIHelpers;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static j2html.TagCreator.body;
+import static j2html.TagCreator.h2;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.apache.commons.lang.StringEscapeUtils.escapeHtml;
+
+public class LogviewerResponseBuilder {
+
+    private LogviewerResponseBuilder() {
+    }
+
+    public static Response buildSuccessHtmlResponse(String content) {
+        return Response.status(OK).entity(content)
+                .type(MediaType.TEXT_HTML_TYPE).build();
+    }
+
+    public static Response buildSuccessJsonResponse(Object entity, String 
callback, String origin) {
+        return new JsonResponseBuilder().setData(entity).setCallback(callback)
+                
.setHeaders(LogviewerResponseBuilder.getHeadersForSuccessResponse(origin)).build();
+    }
+
+    public static Response buildDownloadFile(File file) throws IOException {
+        // do not close this InputStream in method: it will be used from jetty 
server
+        InputStream is = new FileInputStream(file);
+        return Response.status(OK)
+                .entity(wrapWithStreamingOutput(is))
+                .type(MediaType.APPLICATION_OCTET_STREAM_TYPE)
+                .header("Content-Disposition", "attachment; filename=\"" + 
file.getName() + "\"")
+                .build();
+    }
+
+    public static Response buildResponseUnautohrizedUser(String user) {
+        String entity = buildUnauthorizedUserHtml(user);
+        return Response.status(OK)
+                .entity(entity)
+                .type(MediaType.TEXT_HTML_TYPE)
+                .build();
+    }
+
+    public static Response buildResponsePageNotFound() {
+        return Response.status(404)
+                .entity("Page not found")
+                .type(MediaType.TEXT_HTML_TYPE)
+                .build();
+    }
+
+    public static Response buildUnauthorizedUserJsonResponse(String user, 
String callback) {
+        return new 
JsonResponseBuilder().setData(UIHelpers.unauthorizedUserJson(user))
+                .setCallback(callback).setStatus(401).build();
+    }
+
+    public static Response buildExceptionJsonResponse(Exception ex, String 
callback) {
+        return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(ex))
+                .setCallback(callback).setStatus(500).build();
+    }
+
+    private static Map<String, Object> getHeadersForSuccessResponse(String 
origin) {
+        Map<String, Object> headers = new HashMap<>();
+        headers.put("Access-Control-Allow-Origin", origin);
+        headers.put("Access-Control-Allow-Credentials", "true");
+        return headers;
+    }
+
+    private static String buildUnauthorizedUserHtml(String user) {
+        String content = "User '" + escapeHtml(user) + "' is not authorized.";
+        return body(h2(content)).render();
+    }
+
+    private static StreamingOutput wrapWithStreamingOutput(final InputStream 
inputStream) {
+        return os -> {
+            OutputStream wrappedOutputStream = os;
+            if (!(os instanceof BufferedOutputStream)) {
+                wrappedOutputStream = new BufferedOutputStream(os);
+            }
+
+            ByteStreams.copy(inputStream, wrappedOutputStream);
+
+            wrappedOutputStream.flush();
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java
new file mode 100644
index 0000000..c7a45e7
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.utils;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IGroupMappingServiceProvider;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.Utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.DaemonConfig.UI_FILTER;
+
+public class ResourceAuthorizer {
+
+    private final Map<String, Object> stormConf;
+    private final IGroupMappingServiceProvider groupMappingServiceProvider;
+
+    public ResourceAuthorizer(Map<String, Object> stormConf) {
+        this.stormConf = stormConf;
+        this.groupMappingServiceProvider = 
AuthUtils.GetGroupMappingServiceProviderPlugin(stormConf);
+    }
+
+    public boolean isUserAllowedToAccessFile(String fileName, String user) {
+        return isUiFilterNotSet() || isAuthorizedLogUser(user, fileName);
+    }
+
+    private boolean isUiFilterNotSet() {
+        return 
StringUtils.isBlank(ObjectReader.getString(stormConf.get(UI_FILTER), null));
+    }
+
+    private boolean isAuthorizedLogUser(String user, String fileName) {
+        if (StringUtils.isEmpty(user) || StringUtils.isEmpty(fileName)
+                || getLogUserGroupWhitelist(fileName) == null) {
+            return false;
+        } else {
+            Set<String> groups = getUserGroups(user);
+            LogUserGroupWhitelist whitelist = 
getLogUserGroupWhitelist(fileName);
+
+            List<String> logsUsers = new ArrayList<>();
+            
logsUsers.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_USERS)));
+            
logsUsers.addAll(ObjectReader.getStrings(stormConf.get(Config.NIMBUS_ADMINS)));
+            logsUsers.addAll(whitelist.getUserWhitelist());
+
+            List<String> logsGroups = new ArrayList<>();
+            
logsGroups.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_GROUPS)));
+            logsGroups.addAll(whitelist.getGroupWhitelist());
+
+            return logsUsers.stream().anyMatch(u -> u.equals(user)) ||
+                    Sets.intersection(groups, new 
HashSet<>(logsGroups)).size() < 0;
+
+        }
+    }
+
+    public LogUserGroupWhitelist getLogUserGroupWhitelist(String fileName) {
+        File wlFile = ServerConfigUtils.getLogMetaDataFile(fileName);
+        Map<String, Object> map = (Map<String, Object>) 
Utils.readYamlFile(wlFile.getAbsolutePath());
+
+        if (map == null) {
+            return null;
+        }
+
+        List<String> logsUsers = 
ObjectReader.getStrings(map.get(DaemonConfig.LOGS_USERS));
+        List<String> logsGroups = 
ObjectReader.getStrings(map.get(DaemonConfig.LOGS_GROUPS));
+        return new LogUserGroupWhitelist(
+                logsUsers.isEmpty() ? new HashSet<>() : new 
HashSet<>(logsUsers),
+                logsGroups.isEmpty() ? new HashSet<>() : new 
HashSet<>(logsGroups)
+        );
+    }
+
+    private Set<String> getUserGroups(String user) {
+        try {
+            if (StringUtils.isEmpty(user)) {
+                return new HashSet<>();
+            } else {
+                return groupMappingServiceProvider.getGroups(user);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static class LogUserGroupWhitelist {
+
+        private Set<String> userWhitelist;
+        private Set<String> groupWhitelist;
+
+        public LogUserGroupWhitelist(Set<String> userWhitelist, Set<String> 
groupWhitelist) {
+            this.userWhitelist = userWhitelist;
+            this.groupWhitelist = groupWhitelist;
+        }
+
+        public Set<String> getUserWhitelist() {
+            return userWhitelist;
+        }
+
+        public Set<String> getGroupWhitelist() {
+            return groupWhitelist;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java
new file mode 100644
index 0000000..c49a4a1
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java
@@ -0,0 +1,63 @@
+package org.apache.storm.daemon.wip.logviewer.utils;
+
+import org.apache.storm.daemon.DirectoryCleaner;
+import org.apache.storm.utils.Utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toCollection;
+import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast;
+
+public class WorkerLogs {
+
+    private WorkerLogs() {
+    }
+
+    /**
+     * Return the path of the worker log with the format of 
topoId/port/worker.log.*
+     */
+    public static String getTopologyPortWorkerLog(File file) {
+        try {
+            String[] splitted = 
file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR);
+            List<String> split = takeLast(Arrays.asList(splitted), 3);
+
+            return String.join(Utils.FILE_PATH_SEPARATOR, split);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static List<File> getAllLogsForRootDir(File logDir) throws 
IOException {
+        List<File> files = new ArrayList<>();
+        Set<File> topoDirFiles = getAllWorkerDirs(logDir);
+        if (topoDirFiles != null) {
+            for (File portDir : topoDirFiles) {
+                files.addAll(DirectoryCleaner.getFilesForDir(portDir));
+            }
+        }
+
+        return files;
+    }
+
+    public static Set<File> getAllWorkerDirs(File rootDir) {
+        File[] rootDirFiles = rootDir.listFiles();
+        if (rootDirFiles != null) {
+            return Arrays.stream(rootDirFiles).flatMap(topoDir -> {
+                File[] topoFiles = topoDir.listFiles();
+                return topoFiles != null ? Arrays.stream(topoFiles) : 
Stream.empty();
+            }).collect(toCollection(TreeSet::new));
+        }
+
+        return new TreeSet<>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java
new file mode 100644
index 0000000..c1b4bc7
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.webapp;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.storm.daemon.common.AuthorizationExceptionMapper;
+import 
org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler;
+import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.core.Application;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.DaemonConfig.LOGVIEWER_APPENDER_NAME;
+
+@ApplicationPath("")
+public class LogviewerApplication extends Application {
+    private static Map<String, Object> stormConf;
+    private final Set<Object> singletons = new HashSet<Object>();
+
+    /**
+     * Constructor.
+     */
+    public LogviewerApplication() {
+        String logRoot = ConfigUtils.workerArtifactsRoot(stormConf);
+        String daemonLogRoot = 
logRootDir(ObjectReader.getString(stormConf.get(LOGVIEWER_APPENDER_NAME)));
+
+        ResourceAuthorizer resourceAuthorizer = new 
ResourceAuthorizer(stormConf);
+
+        LogviewerLogPageHandler logviewer = new 
LogviewerLogPageHandler(logRoot, daemonLogRoot, resourceAuthorizer);
+        LogviewerProfileHandler profileHandler = new 
LogviewerProfileHandler(logRoot, resourceAuthorizer);
+        LogviewerLogDownloadHandler logDownloadHandler = new 
LogviewerLogDownloadHandler(logRoot, daemonLogRoot,
+                resourceAuthorizer);
+        LogviewerLogSearchHandler logSearchHandler = new 
LogviewerLogSearchHandler(stormConf, logRoot, daemonLogRoot,
+                resourceAuthorizer);
+        IHttpCredentialsPlugin httpCredsHandler = 
AuthUtils.GetUiHttpCredentialsPlugin(stormConf);
+
+        singletons.add(new LogviewerResource(logviewer, profileHandler, 
logDownloadHandler, logSearchHandler, httpCredsHandler));
+        singletons.add(new AuthorizationExceptionMapper());
+    }
+    
+    @Override
+    public Set<Object> getSingletons() {
+        return singletons;
+    }
+
+    public static void setup(Map<String, Object> stormConf) {
+        LogviewerApplication.stormConf = stormConf;
+    }
+
+    /**
+     * Given an appender name, as configured, get the parent directory of the 
appender's log file.
+     * Note that if anything goes wrong, this will throw an Error and exit.
+     */
+    private String logRootDir(String appenderName) {
+        Appender appender = ((LoggerContext) 
LogManager.getContext()).getConfiguration().getAppender(appenderName);
+        if (appenderName != null && appender != null && 
RollingFileAppender.class.isInstance(appender)) {
+            return new File(((RollingFileAppender) 
appender).getFileName()).getParent();
+        } else {
+            throw new RuntimeException("Log viewer could not find configured 
appender, or the appender is not a FileAppender. "
+                    + "Please check that the appender name configured in storm 
and log4j agree.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a79053/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java
new file mode 100644
index 0000000..fd8ea97
--- /dev/null
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.wip.logviewer.webapp;
+
+import com.codahale.metrics.Meter;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.daemon.common.JsonResponseBuilder;
+import 
org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler;
+import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.ui.InvalidRequestException;
+import org.apache.storm.ui.UIHelpers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.Map;
+
+@Path("/")
+public class LogviewerResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogviewerResource.class);
+
+    private static final Meter meterLogPageHttpRequests = 
StormMetricsRegistry.registerMeter("logviewer:num-log-page-http-requests");
+    private static final Meter meterDaemonLogPageHttpRequests = 
StormMetricsRegistry.registerMeter("logviewer:num-daemonlog-page-http-requests");
+    private static final Meter meterDownloadLogFileHttpRequests = 
StormMetricsRegistry.registerMeter("logviewer:num-download-log-file-http-requests");
+    private static final Meter meterDownloadLogDaemonFileHttpRequests = 
StormMetricsRegistry.registerMeter("logviewer:num-download-log-daemon-file-http-requests");
+    private static final Meter meterListLogsHttpRequests = 
StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests");
+
+    private final LogviewerLogPageHandler logviewer;
+    private final LogviewerProfileHandler profileHandler;
+    private final LogviewerLogDownloadHandler logDownloadHandler;
+    private final LogviewerLogSearchHandler logSearchHandler;
+    private final IHttpCredentialsPlugin httpCredsHandler;
+
+    public LogviewerResource(LogviewerLogPageHandler logviewerParam, 
LogviewerProfileHandler profileHandler,
+                             LogviewerLogDownloadHandler logDownloadHandler, 
LogviewerLogSearchHandler logSearchHandler,
+                             IHttpCredentialsPlugin httpCredsHandler) {
+        this.logviewer = logviewerParam;
+        this.profileHandler = profileHandler;
+        this.logDownloadHandler = logDownloadHandler;
+        this.logSearchHandler = logSearchHandler;
+        this.httpCredsHandler = httpCredsHandler;
+    }
+
+    @GET
+    @Path("/log")
+    public Response log(@Context HttpServletRequest request) throws 
IOException {
+        meterLogPageHttpRequests.mark();
+
+        try {
+            String user = httpCredsHandler.getUserName(request);
+            Integer start = request.getParameter("start") != null ? 
parseIntegerFromMap(request.getParameterMap(), "start") : null;
+            Integer length = request.getParameter("length") != null ? 
parseIntegerFromMap(request.getParameterMap(), "length") : null;
+            String decodedFileName = 
URLDecoder.decode(request.getParameter("file"));
+            String grep = request.getParameter("grep");
+            return logviewer.logPage(decodedFileName, start, length, grep, 
user);
+        } catch (InvalidRequestException e) {
+            LOG.error(e.getMessage(), e);
+            return Response.status(400).entity(e.getMessage()).build();
+        }
+    }
+
+    @GET
+    @Path("/daemonlog")
+    public Response daemonLog(@Context HttpServletRequest request) throws 
IOException {
+        meterDaemonLogPageHttpRequests.mark();
+
+        try {
+            String user = httpCredsHandler.getUserName(request);
+            Integer start = request.getParameter("start") != null ? 
parseIntegerFromMap(request.getParameterMap(), "start") : null;
+            Integer length = request.getParameter("length") != null ? 
parseIntegerFromMap(request.getParameterMap(), "length") : null;
+            String decodedFileName = 
URLDecoder.decode(request.getParameter("file"));
+            String grep = request.getParameter("grep");
+            return logviewer.daemonLogPage(decodedFileName, start, length, 
grep, user);
+        } catch (InvalidRequestException e) {
+            LOG.error(e.getMessage(), e);
+            return Response.status(400).entity(e.getMessage()).build();
+        }
+    }
+
+    @GET
+    @Path("/searchLogs")
+    public Response searchLogs(@Context HttpServletRequest request) throws 
IOException {
+        String user = httpCredsHandler.getUserName(request);
+        String topologyId = request.getParameter("topoId");
+        String portStr = request.getParameter("port");
+        String callback = request.getParameter("callback");
+        String origin = request.getHeader("Origin");
+
+        return logviewer.listLogFiles(user, portStr != null ? 
Integer.parseInt(portStr) : null, topologyId, callback, origin);
+    }
+
+    @GET
+    @Path("/listLogs")
+    public Response listLogs(@Context HttpServletRequest request) throws 
IOException {
+        meterListLogsHttpRequests.mark();
+
+        String user = httpCredsHandler.getUserName(request);
+        String topologyId = request.getParameter("topoId");
+        String portStr = request.getParameter("port");
+        String callback = request.getParameter("callback");
+        String origin = request.getHeader("Origin");
+
+        return logviewer.listLogFiles(user, portStr != null ? 
Integer.parseInt(portStr) : null, topologyId, callback, origin);
+    }
+
+    @GET
+    @Path("/dumps/{topo-id}/{host-port}")
+    public Response listDumpFiles(@PathParam("topo-id") String topologyId, 
@PathParam("host-port") String hostPort,
+                                  @Context HttpServletRequest request) throws 
IOException {
+        String user = httpCredsHandler.getUserName(request);
+        return profileHandler.listDumpFiles(topologyId, hostPort, user);
+    }
+
+    @GET
+    @Path("/dumps/{topo-id}/{host-port}/{filename}")
+    public Response downloadDumpFile(@PathParam("topo-id") String topologyId, 
@PathParam("host-port") String hostPort,
+                                     @PathParam("filename") String fileName, 
@Context HttpServletRequest request) throws IOException {
+        String user = httpCredsHandler.getUserName(request);
+        return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, 
user);
+    }
+
+    @GET
+    @Path("/download")
+    public Response downloadLogFile(@Context HttpServletRequest request) 
throws IOException {
+        meterDownloadLogFileHttpRequests.mark();
+
+        String user = httpCredsHandler.getUserName(request);
+        String file = request.getParameter("file");
+        String decodedFileName = URLDecoder.decode(file);
+        return logDownloadHandler.downloadLogFile(decodedFileName, user);
+    }
+
+    @GET
+    @Path("/daemondownload")
+    public Response downloadDaemonLogFile(@Context HttpServletRequest request) 
throws IOException {
+        meterDownloadLogDaemonFileHttpRequests.mark();
+
+        String user = httpCredsHandler.getUserName(request);
+        String file = request.getParameter("file");
+        String decodedFileName = URLDecoder.decode(file);
+        return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user);
+    }
+
+    @GET
+    @Path("/search/{file}")
+    public Response search(@PathParam("file") String file, @Context 
HttpServletRequest request) throws IOException {
+        String user = httpCredsHandler.getUserName(request);
+        boolean isDaemon = 
StringUtils.equals(request.getParameter("is-daemon"), "yes");
+        String decodedFileName = URLDecoder.decode(file);
+        String searchString = request.getParameter("search-string");
+        String numMatchesStr = request.getParameter("num-matches");
+        String startByteOffset = request.getParameter("start-byte-offset");
+        String callback = request.getParameter("callback");
+        String origin = request.getHeader("Origin");
+
+        try {
+            return logSearchHandler.searchLogFile(decodedFileName, user, 
isDaemon, searchString, numMatchesStr,
+                    startByteOffset, callback, origin);
+        } catch (InvalidRequestException e) {
+            LOG.error(e.getMessage(), e);
+            return new 
JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e)).setCallback(callback)
+                    .setStatus(400).build();
+        }
+    }
+
+    @GET
+    @Path("/deepSearch/{topoId}")
+    public Response deepSearch(@PathParam("topoId") String topologyId,
+                               @Context HttpServletRequest request) throws 
IOException {
+        String user = httpCredsHandler.getUserName(request);
+        String searchString = request.getParameter("search-string");
+        String numMatchesStr = request.getParameter("num-matches");
+        String portStr = request.getParameter("port");
+        String startFileOffset = request.getParameter("start-file-offset");
+        String startByteOffset = request.getParameter("start-byte-offset");
+        String searchArchived = request.getParameter("search-archived");
+        String callback = request.getParameter("callback");
+        String origin = request.getHeader("Origin");
+
+        return logSearchHandler.deepSearchLogsForTopology(topologyId, user, 
searchString, numMatchesStr, portStr,
+                startFileOffset, startByteOffset, 
BooleanUtils.toBooleanObject(searchArchived), callback, origin);
+    }
+
+    private int parseIntegerFromMap(Map map, String parameterKey) throws 
InvalidRequestException {
+        try {
+            return Integer.parseInt(((String[]) map.get(parameterKey))[0]);
+        } catch (NumberFormatException ex) {
+            throw new InvalidRequestException("Could not make an integer out 
of the query parameter '"
+                + parameterKey + "'", ex);
+        }
+    }
+
+}

Reply via email to