http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
new file mode 100644
index 0000000..29dff1b
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.handler;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.logviewer.LogviewerConstant;
+import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.ui.InvalidRequestException;
+import org.apache.storm.utils.Utils;
+import org.jooq.lambda.Seq;
+import org.jooq.lambda.Unchecked;
+import org.jooq.lambda.tuple.Tuple3;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileAttribute;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.joining;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+@RunWith(Enclosed.class)
+public class LogviewerLogSearchHandlerTest {
+
+    public static class SearchViaRestApi {
+        private String pattern = "needle";
+        private String expectedHost = "dev.null.invalid";
+        private Integer expectedPort = 8888;
+        private String logviewerUrlPrefix = "http://"; + expectedHost + ":" + 
expectedPort;
+
+        /*
+         When we click a link to the logviewer, we expect the match line to
+         be somewhere near the middle of the page.  So we subtract half of
+         the default page length from the offset at which we found the
+         match.
+         */
+        private Function<Integer, Integer> expOffsetFn = arg -> 
(LogviewerConstant.DEFAULT_BYTES_PER_PAGE / 2 - arg);
+
+        @Test(expected = RuntimeException.class)
+        public void testSearchViaRestApiThrowsIfBogusFileIsGiven() throws 
InvalidRequestException {
+            LogviewerLogSearchHandler handler = getSearchHandler();
+            handler.substringSearch(null, "a string");
+        }
+
+        @Test
+        public void testLogviewerLinkCentersTheMatchInThePage() throws 
UnknownHostException {
+            String expectedFname = "foobar.log";
+
+            LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                String actualUrl = handler.urlToMatchCenteredInLogPage(new 
byte[42], expectedFname, 27526, 8888);
+
+                assertEquals("http://"; + expectedHost + ":" + expectedPort + 
"/api/v1/log?file=" + expectedFname +
+                        "&start=1947&length=" + 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE, actualUrl);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testLogviewerLinkCentersTheMatchInThePageDaemon() throws 
UnknownHostException {
+            String expectedFname = "foobar.log";
+
+            LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                String actualUrl = handler.urlToMatchCenteredInLogPage(new 
byte[42], expectedFname, 27526, 8888);
+
+                assertEquals("http://"; + expectedHost + ":" + expectedPort + 
"/api/v1/daemonlog?file=" + expectedFname +
+                        "&start=1947&length=" + 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE, actualUrl);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testReturnsCorrectBeforeAndAfterContext() throws 
InvalidRequestException, UnknownHostException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                    "logviewer-search-context-tests.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(0, "",
+                        " 
needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle
 ",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                        ));
+
+                matches.add(buildMatchData(7, "needle ",
+                        
"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle
 needle\n",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                        ));
+
+                matches.add(buildMatchData(127,
+                        "needle 
needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
+                        " needle\n",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                        ));
+
+                // FIXME: currently fail on this...
+                matches.add(buildMatchData(134,
+                        " 
needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle
 ",
+                        "\n",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                        ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testAreallySmallLogFile() throws InvalidRequestException, 
UnknownHostException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "small-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(7, "000000 ",
+                        " 000000\n",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testAreallySmallLogDaemonFile() throws 
InvalidRequestException, UnknownHostException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "small-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+                Map<String, Object> searchResult = 
handler.substringSearchDaemonLog(file, pattern);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "yes");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(7, "000000 ",
+                        " 000000\n",
+                        pattern,
+                        "/api/v1/daemonlog?file=" + file.getName() + 
"&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        // FIXME: failing now
+        @Test
+        public void testNoOffsetReturnedWhenFileEndsOnBufferOffset() throws 
InvalidRequestException, UnknownHostException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-3072.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern);
+                Map<String, Object> searchResult2 = 
handler.substringSearch(file, pattern, 1);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(3066,
+                        Seq.range(0, 128).map(x -> ".").collect(joining()),
+                        "",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+                assertEquals(expected, searchResult2);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testNextByteOffsetsAreCorrectForEachMatch() throws 
UnknownHostException, InvalidRequestException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+
+                List<Tuple3<Integer, Integer, Integer>> dataAndExpected = new 
ArrayList<>();
+                // numMatchesSought, numMatchesFound, expectedNextByteOffset
+                dataAndExpected.add(new Tuple3<>(1, 1, 11));
+                dataAndExpected.add(new Tuple3<>(2, 2, 2042));
+                dataAndExpected.add(new Tuple3<>(3, 3, 2052));
+                dataAndExpected.add(new Tuple3<>(4, 4, 3078));
+                dataAndExpected.add(new Tuple3<>(5, 5, 3196));
+                dataAndExpected.add(new Tuple3<>(6, 6, 3202));
+                dataAndExpected.add(new Tuple3<>(7, 7, 6252));
+                dataAndExpected.add(new Tuple3<>(8, 8, 6321));
+                dataAndExpected.add(new Tuple3<>(9, 9, 6397));
+                dataAndExpected.add(new Tuple3<>(10, 10, 6476));
+                dataAndExpected.add(new Tuple3<>(11, 11, 6554));
+                dataAndExpected.add(new Tuple3<>(12, 12, null));
+                dataAndExpected.add(new Tuple3<>(13, 12, null));
+
+                dataAndExpected.forEach(Unchecked.consumer(data -> {
+                    Map<String, Object> result = handler.substringSearch(file, 
pattern, data.v1());
+                    assertEquals(data.v3(), result.get("nextByteOffset"));
+                    assertEquals(data.v2().intValue(), ((List) 
result.get("matches")).size());
+                }));
+
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern, 7);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+                expected.put("nextByteOffset", 6252);
+
+                // FIXME: currently failing on below...
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(5,
+                        "Test ",
+                        " is near the beginning of the file.\nThis file 
assumes a buffer size of 2048 bytes, a max search string size of 1024 bytes, 
and a",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(2036,
+                        "ng 146\npadding 147\npadding 148\npadding 
149\npadding 150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 
byte block, a ",
+                        ".\nA needle that straddles a 1024 byte boundary 
should also be detected.\n\npadding 157\npadding 158\npadding 159\npadding 
160\npadding",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(2046,
+                        "ding 147\npadding 148\npadding 149\npadding 
150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 byte block, 
a needle.\nA ",
+                        " that straddles a 1024 byte boundary should also be 
detected.\n\npadding 157\npadding 158\npadding 159\npadding 160\npadding 
161\npaddi",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(3072,
+                        "adding 226\npadding 227\npadding 228\npadding 
229\npadding 230\npadding 231\npadding 232\npadding 233\npadding 234\npadding 
235\n\n\nHere a ",
+                        " occurs just after a 1024 byte boundary.  It should 
have the correct context.\n\nText with two adjoining matches: 
needleneedle\n\npa",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(3190,
+                        "\n\n\nHere a needle occurs just after a 1024 byte 
boundary.  It should have the correct context.\n\nText with two adjoining 
matches: ",
+                        "needle\n\npadding 243\npadding 244\npadding 
245\npadding 246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 
251\npadding 252\n",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(3196,
+                        "e a needle occurs just after a 1024 byte boundary.  
It should have the correct context.\n\nText with two adjoining matches: needle",
+                        "\n\npadding 243\npadding 244\npadding 245\npadding 
246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 251\npadding 
252\npaddin",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(6246,
+                        
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are 
four non-ascii 1-byte UTF-8 characters: αβγδε\n\n",
+                        "\n\nHere are four printable 2-byte UTF-8 characters: 
¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: 
ऄअ",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        // FIXME: this is failing...
+        @Test
+        public void testCorrectMatchOffsetIsReturnedWhenSkippingBytes() throws 
InvalidRequestException, UnknownHostException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+
+                int startByteOffset = 3197;
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern, 1, startByteOffset);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", startByteOffset);
+                expected.put("nextByteOffset", 6252);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(6246,
+                        
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are 
four non-ascii 1-byte UTF-8 characters: αβγδε\n\n",
+                        "\n\nHere are four printable 2-byte UTF-8 characters: 
¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: 
ऄअ",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        // FIXME: this is failing...
+        // java.lang.StringIndexOutOfBoundsException: String index out of 
range: -39
+        @Test
+        public void testAnotherPatterns1() throws UnknownHostException, 
InvalidRequestException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+
+                String pattern = Seq.range(0, 1024).map(x -> 
"X").collect(joining());
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern, 2);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+                expected.put("nextByteOffset", 6183);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(4075,
+                        "\n\nThe following match of 1024 bytes completely 
fills half the byte buffer.  It is a search substring of the maximum 
size......\n\n",
+                        "\nThe following max-size match straddles a 1024 byte 
buffer.\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                matches.add(buildMatchData(5159,
+                        
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nThe 
following max-size match straddles a 1024 byte buffer.\n",
+                        "\n\nHere are four non-ascii 1-byte UTF-8 characters: 
αβγδε\n\nneedle\n\nHere are four printable 2-byte UTF-8 characters: 
¡¢£¤",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", Collections.emptyList());
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        // FIXME: this is failing...
+        @Test
+        public void testAnotherPatterns2() throws UnknownHostException, 
InvalidRequestException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+
+                String pattern = "𐄀𐄁𐄂";
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern, 1);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+                expected.put("nextByteOffset", 7176);
+
+                List<Map<String, Object>> matches = new ArrayList<>();
+
+                matches.add(buildMatchData(7164,
+                        "padding 372\npadding 373\npadding 374\npadding 
375\n\nThe following tests multibyte UTF-8 Characters straddling the byte 
boundary:   ",
+                        "\n\nneedle",
+                        pattern,
+                        "/api/v1/log?file=test%2Fresources%2F" + 
file.getName() + "&start=0&length=51200"
+                ));
+
+                expected.put("matches", matches);
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        @Test
+        public void testReturnsZeroMatchesForUnseenPattern() throws 
UnknownHostException, InvalidRequestException {
+            Utils prevUtils = null;
+            try {
+                Utils mockedUtil = mock(Utils.class);
+                prevUtils = Utils.setInstance(mockedUtil);
+
+                String pattern = "Not There";
+
+                when(mockedUtil.hostname()).thenReturn(expectedHost);
+
+                File file = new File(String.join(File.separator, "src", 
"test", "resources"),
+                        "test-worker.log.test");
+
+                LogviewerLogSearchHandler handler = 
getSearchHandlerWithPort(expectedPort);
+
+                Map<String, Object> searchResult = 
handler.substringSearch(file, pattern);
+
+                Map<String, Object> expected = new HashMap<>();
+                expected.put("isDaemon", "no");
+                expected.put("searchString", pattern);
+                expected.put("startByteOffset", 0);
+
+                expected.put("matches", Collections.emptyList());
+
+                assertEquals(expected, searchResult);
+            } finally {
+                Utils.setInstance(prevUtils);
+            }
+        }
+
+        private Map<String, Object> buildMatchData(int byteOffset, String 
beforeString, String afterString,
+                                                   String matchString, String 
logviewerUrlPath) {
+            Map<String, Object> match = new HashMap<>();
+            match.put("byteOffset", byteOffset);
+            match.put("beforeString", beforeString);
+            match.put("afterString", afterString);
+            match.put("matchString", matchString);
+            match.put("logviewerURL", logviewerUrlPrefix + logviewerUrlPath);
+            return match;
+        }
+    }
+
+    public static class FindNMatchesTest {
+        /**
+         * find-n-matches looks through logs properly.
+         */
+        @Test
+        public void testFindNMatches() {
+            List<File> files = new ArrayList<>();
+            files.add(new File(String.join(File.separator, "src", "test", 
"resources"),
+                    "logviewer-search-context-tests.log.test"));
+            files.add(new File(String.join(File.separator, "src", "test", 
"resources"),
+                    "logviewer-search-context-tests.log.gz"));
+
+            LogviewerLogSearchHandler handler = getSearchHandler();
+
+            List<Map<String, Object>> matches1 = handler.findNMatches(files, 
20, 0, 0, "needle").getMatches();
+            List<Map<String, Object>> matches2 = handler.findNMatches(files, 
20, 0, 126, "needle").getMatches();
+            List<Map<String, Object>> matches3 = handler.findNMatches(files, 
20, 1, 0, "needle").getMatches();
+
+            assertEquals(2, matches1.size());
+            assertEquals(4, ((List) matches1.get(0).get("matches")).size());
+            assertEquals(4, ((List) matches1.get(1).get("matches")).size());
+            
assertEquals("test/resources/logviewer-search-context-tests.log.test", 
matches1.get(0).get("fileName"));
+            
assertEquals("test/resources/logviewer-search-context-tests.log.gz", 
matches1.get(1).get("fileName"));
+
+            assertEquals(2, ((List) matches2.get(0).get("matches")).size());
+            assertEquals(4, ((List) matches2.get(1).get("matches")).size());
+
+            assertEquals(1, matches3.size());
+            assertEquals(4, ((List) matches3.get(0).get("matches")).size());
+        }
+    }
+
+    public static class TestDeepSearchLogs {
+
+        private List<File> logFiles;
+        private String topoPath;
+
+        @Before
+        public void setUp() throws IOException {
+            logFiles = new ArrayList<>();
+            logFiles.add(new File(String.join(File.separator, "src", "test", 
"resources"),
+                    "logviewer-search-context-tests.log.test"));
+            logFiles.add(new File(String.join(File.separator, "src", "test", 
"resources"),
+                    "logviewer-search-context-tests.log.gz"));
+
+            FileAttribute[] attrs = new FileAttribute[0];
+            topoPath = Files.createTempDirectory("topoA", 
attrs).toFile().getCanonicalPath();
+            new File(topoPath, "6400").createNewFile();
+            new File(topoPath, "6500").createNewFile();
+            new File(topoPath, "6600").createNewFile();
+            new File(topoPath, "6700").createNewFile();
+        }
+
+        @After
+        public void tearDown() {
+            if (StringUtils.isNotEmpty(topoPath)) {
+                try {
+                    Utils.forceDelete(topoPath);
+                } catch (IOException e) {
+                    // ignore...
+                }
+            }
+        }
+
+        @Test
+        public void testAllPortsAndSearchArchivedIsTrue() throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", "*", 
"20", "199", true, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            verify(handler, times(4)).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, times(4)).logsForPort(anyString(), 
any(File.class));
+
+            // File offset and byte offset should always be zero when 
searching multiple workers (multiple ports).
+            assertEquals(logFiles, files.getAllValues().get(0));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(0));
+            assertEquals("search", search.getAllValues().get(0));
+            assertEquals(logFiles, files.getAllValues().get(0));
+
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(1));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(1));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(1));
+            assertEquals("search", search.getAllValues().get(1));
+            assertEquals(logFiles, files.getAllValues().get(1));
+
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(2));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(2));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(2));
+            assertEquals("search", search.getAllValues().get(2));
+            assertEquals(logFiles, files.getAllValues().get(2));
+
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(3));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(3));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(3));
+            assertEquals("search", search.getAllValues().get(3));
+        }
+
+        @Test
+        public void testAllPortsAndSearchArchivedIsFalse() throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", null, 
"20", "199", false, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            verify(handler, times(4)).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, times(4)).logsForPort(anyString(), 
any(File.class));
+
+            // File offset and byte offset should always be zero when 
searching multiple workers (multiple ports).
+            assertEquals(Collections.singletonList(logFiles.get(0)), 
files.getAllValues().get(0));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(0));
+            assertEquals("search", search.getAllValues().get(0));
+
+            assertEquals(Collections.singletonList(logFiles.get(0)), 
files.getAllValues().get(1));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(1));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(1));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(1));
+            assertEquals("search", search.getAllValues().get(1));
+
+            assertEquals(Collections.singletonList(logFiles.get(0)), 
files.getAllValues().get(2));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(2));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(2));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(2));
+            assertEquals("search", search.getAllValues().get(2));
+
+            assertEquals(Collections.singletonList(logFiles.get(0)), 
files.getAllValues().get(3));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(3));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(3));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(3));
+            assertEquals("search", search.getAllValues().get(3));
+        }
+
+        @Test
+        public void testOnePortAndSearchArchivedIsTrueAndNotFileOffset() 
throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", 
"6700", "0", "0", true, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            verify(handler, times(1)).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, times(2)).logsForPort(anyString(), 
any(File.class));
+
+            assertEquals(logFiles, files.getAllValues().get(0));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(0));
+            assertEquals("search", search.getAllValues().get(0));
+        }
+
+        @Test
+        public void testOnePortAndSearchArchivedIsTrueAndFileOffsetIs1() 
throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", 
"6700", "1", "0", true, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            verify(handler, times(1)).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, times(2)).logsForPort(anyString(), 
any(File.class));
+
+            assertEquals(logFiles, files.getAllValues().get(0));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(0));
+            assertEquals(Integer.valueOf(1), fileOffset.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(0));
+            assertEquals("search", search.getAllValues().get(0));
+        }
+
+        @Test
+        public void testOnePortAndSearchArchivedIsFalseAndFileOffsetIs1() 
throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", 
"6700", "1", "0", false, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            verify(handler, times(1)).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, times(2)).logsForPort(anyString(), 
any(File.class));
+
+            // File offset should be zero, since search-archived is false.
+            assertEquals(Collections.singletonList(logFiles.get(0)), 
files.getAllValues().get(0));
+            assertEquals(Integer.valueOf(20), 
numMatches.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), fileOffset.getAllValues().get(0));
+            assertEquals(Integer.valueOf(0), offset.getAllValues().get(0));
+            assertEquals("search", search.getAllValues().get(0));
+        }
+
+        @Test
+        public void 
testOnePortAndSearchArchivedIsTrueAndFileOffsetIs1AndByteOffsetIs100() throws 
IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", 
"6700", "1", "100", true, null, null);
+
+            verify(handler, times(1)).findNMatches(anyListOf(File.class), 
anyInt(), anyInt(), anyInt(), anyString());
+            verify(handler, times(2)).logsForPort(anyString(), 
any(File.class));
+        }
+
+        @Test
+        public void testBadPortAndSearchArchivedIsFalseAndFileOffsetIs1() 
throws IOException {
+            LogviewerLogSearchHandler handler = getStubbedSearchHandler();
+
+            handler.deepSearchLogsForTopology("", null, "search", "20", 
"2700", "1", "0", false, null, null);
+
+            ArgumentCaptor<List> files = ArgumentCaptor.forClass(List.class);
+            ArgumentCaptor<Integer> numMatches = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> fileOffset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<Integer> offset = 
ArgumentCaptor.forClass(Integer.class);
+            ArgumentCaptor<String> search = 
ArgumentCaptor.forClass(String.class);
+
+            // Called with a bad port (not in the config) No searching should 
be done.
+            verify(handler, never()).findNMatches(files.capture(), 
numMatches.capture(), fileOffset.capture(),
+                    offset.capture(), search.capture());
+            verify(handler, never()).logsForPort(anyString(), any(File.class));
+        }
+
+        private LogviewerLogSearchHandler getStubbedSearchHandler() {
+            Map<String, Object> stormConf = Utils.readStormConfig();
+            LogviewerLogSearchHandler handler = new 
LogviewerLogSearchHandler(stormConf, topoPath, null,
+                    new ResourceAuthorizer(stormConf));
+            handler = spy(handler);
+
+            doReturn(logFiles).when(handler).logsForPort(anyString(), 
any(File.class));
+            doAnswer(invocationOnMock -> {
+                Object[] arguments = invocationOnMock.getArguments();
+                int fileOffset = (Integer) arguments[2];
+                String search = (String) arguments[4];
+
+                return new LogviewerLogSearchHandler.Matched(fileOffset, 
search, Collections.emptyList());
+            }).when(handler).findNMatches(anyListOf(File.class), anyInt(), 
anyInt(), anyInt(), anyString());
+
+            return handler;
+        }
+    }
+
+    private static LogviewerLogSearchHandler getSearchHandler() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+        return new LogviewerLogSearchHandler(stormConf, null, null,
+                new ResourceAuthorizer(stormConf));
+    }
+
+    private static LogviewerLogSearchHandler getSearchHandlerWithPort(int 
port) {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+        stormConf.put(DaemonConfig.LOGVIEWER_PORT, port);
+        return new LogviewerLogSearchHandler(stormConf, null, null,
+                new ResourceAuthorizer(stormConf));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/ArgumentsVerifier.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/ArgumentsVerifier.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/ArgumentsVerifier.java
new file mode 100644
index 0000000..e52c9db
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/ArgumentsVerifier.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import org.mockito.ArgumentCaptor;
+
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+public class ArgumentsVerifier {
+    public static <T> void 
verifyFirstCallArgsForSingleArgMethod(Consumer<ArgumentCaptor<T>> 
verifyConsumer,
+                                                                 Class<T> 
argClazz, T expectedArg) {
+        ArgumentCaptor<T> captor = ArgumentCaptor.forClass(argClazz);
+        verifyConsumer.accept(captor);
+        assertEquals(expectedArg, captor.getAllValues().get(0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockDirectoryBuilder.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockDirectoryBuilder.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockDirectoryBuilder.java
new file mode 100644
index 0000000..7215456
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockDirectoryBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockDirectoryBuilder {
+    private String dirName;
+    private long mtime;
+    private File[] files;
+
+    public MockDirectoryBuilder() {
+        this.dirName = "adir";
+        this.mtime = 1;
+        this.files = new File[]{};
+    }
+
+    public MockDirectoryBuilder setDirName(String name) {
+        this.dirName = name;
+        return this;
+    }
+
+    public MockDirectoryBuilder setMtime(long mtime) {
+        this.mtime = mtime;
+        return this;
+    }
+
+    public MockDirectoryBuilder setFiles(File[] files) {
+        this.files = files;
+        return this;
+    }
+
+    public File build() {
+        File mockFile = mock(File.class);
+        when(mockFile.getName()).thenReturn(dirName);
+        when(mockFile.lastModified()).thenReturn(mtime);
+        when(mockFile.isFile()).thenReturn(false);
+        when(mockFile.listFiles()).thenReturn(files);
+        try {
+            when(mockFile.getCanonicalPath()).thenReturn(dirName);
+        } catch (IOException e) {
+            // we're making mock, ignoring...
+        }
+        return mockFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockFileBuilder.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockFileBuilder.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockFileBuilder.java
new file mode 100644
index 0000000..385eec4
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockFileBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockFileBuilder {
+    private String fileName;
+    private long mtime;
+    private long length;
+
+    public MockFileBuilder() {
+        this.fileName = "afile";
+        this.mtime = 1;
+        this.length = 10 * 1024 * 1024 * 1024;
+    }
+
+    public MockFileBuilder setFileName(String fileName) {
+        this.fileName = fileName;
+        return this;
+    }
+
+    public MockFileBuilder setMtime(long mtime) {
+        this.mtime = mtime;
+        return this;
+    }
+
+    public MockFileBuilder setLength(long length) {
+        this.length = length;
+        return this;
+    }
+
+    public File build() {
+        File mockFile = mock(File.class);
+        when(mockFile.getName()).thenReturn(fileName);
+        when(mockFile.lastModified()).thenReturn(mtime);
+        when(mockFile.isFile()).thenReturn(true);
+        try {
+            
when(mockFile.getCanonicalPath()).thenReturn("/mock/canonical/path/to/" + 
fileName);
+        } catch (IOException e) {
+            // we're making mock, ignoring...
+        }
+        when(mockFile.length()).thenReturn(length);
+        return mockFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
new file mode 100644
index 0000000..0c7d69c
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder;
+import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.jooq.lambda.Seq;
+import org.junit.Test;
+import org.mockito.internal.util.collections.Sets;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.storm.Config.SUPERVISOR_WORKER_TIMEOUT_SECS;
+import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS;
+import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Matchers.anySetOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LogCleanerTest {
+    /**
+     * Log file filter selects the correct worker-log dirs for purge.
+     */
+    @Test
+    public void testMkFileFilterForLogCleanup() throws IOException {
+        DirectoryCleaner mockDirectoryCleaner = mock(DirectoryCleaner.class);
+        
when(mockDirectoryCleaner.getStreamForDirectory(any(File.class))).thenAnswer(invocationOnMock
 -> {
+            File file = (File) invocationOnMock.getArguments()[0];
+            List<Path> paths = Arrays.stream(file.listFiles()).map(f -> 
mkMockPath(f)).collect(toList());
+            return mkDirectoryStream(paths);
+        });
+
+        // this is to read default value for other configurations
+        Map<String, Object> conf = Utils.readStormConfig();
+        conf.put(LOGVIEWER_CLEANUP_AGE_MINS, 60);
+        conf.put(LOGVIEWER_CLEANUP_INTERVAL_SECS, 300);
+
+        LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner);
+
+        long nowMillis = Time.currentTimeMillis();
+        long cutoffMillis = new LogCleaner(conf, 
mockDirectoryCleaner).cleanupCutoffAgeMillis(nowMillis);
+        long oldMtimeMillis = cutoffMillis - 500;
+        long newMtimeMillis = cutoffMillis + 500;
+
+        List<File> matchingFiles = new ArrayList<>();
+        matchingFiles.add(new 
MockDirectoryBuilder().setDirName("3031").setMtime(oldMtimeMillis).build());
+        matchingFiles.add(new 
MockDirectoryBuilder().setDirName("3032").setMtime(oldMtimeMillis).build());
+        matchingFiles.add(new 
MockDirectoryBuilder().setDirName("7077").setMtime(oldMtimeMillis).build());
+
+        List<File> excludedFiles = new ArrayList<>();
+        excludedFiles.add(new 
MockFileBuilder().setFileName("oldlog-1-2-worker-.log").setMtime(oldMtimeMillis).build());
+        excludedFiles.add(new 
MockFileBuilder().setFileName("newlog-1-2-worker-.log").setMtime(newMtimeMillis).build());
+        excludedFiles.add(new 
MockFileBuilder().setFileName("some-old-file.txt").setMtime(oldMtimeMillis).build());
+        excludedFiles.add(new 
MockFileBuilder().setFileName("olddir-1-2-worker.log").setMtime(newMtimeMillis).build());
+        excludedFiles.add(new 
MockDirectoryBuilder().setDirName("metadata").setMtime(newMtimeMillis).build());
+        excludedFiles.add(new 
MockDirectoryBuilder().setDirName("newdir").setMtime(newMtimeMillis).build());
+
+        FileFilter fileFilter = 
logCleaner.mkFileFilterForLogCleanup(nowMillis);
+
+        assertTrue(matchingFiles.stream().allMatch(fileFilter::accept));
+        assertTrue(excludedFiles.stream().noneMatch(fileFilter::accept));
+    }
+
+    /**
+     * cleaner deletes oldest files in each worker dir if files are larger 
than per-dir quota.
+     */
+    @Test
+    public void testPerWorkerDirectoryCleanup() throws IOException {
+        Utils prevUtils = null;
+        try {
+            Utils mockUtils = mock(Utils.class);
+            prevUtils = Utils.setInstance(mockUtils);
+
+            DirectoryCleaner mockDirectoryCleaner = 
mock(DirectoryCleaner.class);
+            
when(mockDirectoryCleaner.getStreamForDirectory(any(File.class))).thenAnswer(invocationOnMock
 -> {
+                File file = (File) invocationOnMock.getArguments()[0];
+                List<Path> paths = Arrays.stream(file.listFiles()).map(f -> 
mkMockPath(f)).collect(toList());
+                return mkDirectoryStream(paths);
+            });
+            
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(String.class)))
+                    .thenCallRealMethod();
+
+            Map<String, Object> conf = Utils.readStormConfig();
+            LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner);
+
+            long nowMillis = Time.currentTimeMillis();
+
+            List<File> files1 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("A" + idx)
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+            List<File> files2 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("B" + idx)
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+            List<File> files3 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("C" + idx)
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+            File port1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1")
+                    .setFiles(files1.toArray(new File[]{})).build();
+            File port2Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port2")
+                    .setFiles(files2.toArray(new File[]{})).build();
+            File port3Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo2/port3")
+                    .setFiles(files3.toArray(new File[]{})).build();
+
+            File[] topo1Files = new File[] { port1Dir, port2Dir };
+            File[] topo2Files = new File[] { port3Dir };
+            File topo1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1")
+                    .setFiles(topo1Files).build();
+            File topo2Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo2")
+                    .setFiles(topo2Files).build();
+
+            File[] rootFiles = new File[] { topo1Dir, topo2Dir };
+            File rootDir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts")
+                    .setFiles(rootFiles).build();
+
+            List<Integer> deletedFiles = 
logCleaner.perWorkerDirCleanup(rootDir, 1200, mockDirectoryCleaner);
+            assertEquals(Integer.valueOf(4), deletedFiles.get(0));
+            assertEquals(Integer.valueOf(4), deletedFiles.get(1));
+            assertEquals(Integer.valueOf(4), 
deletedFiles.get(deletedFiles.size() - 1));
+        } finally {
+            Utils.setInstance(prevUtils);
+        }
+    }
+
+    @Test
+    public void testGlobalLogCleanup() throws Exception {
+        Utils prevUtils = null;
+        try {
+            Utils mockUtils = mock(Utils.class);
+            prevUtils = Utils.setInstance(mockUtils);
+
+            DirectoryCleaner mockDirectoryCleaner = 
mock(DirectoryCleaner.class);
+            
when(mockDirectoryCleaner.getStreamForDirectory(any(File.class))).thenAnswer(invocationOnMock
 -> {
+                File file = (File) invocationOnMock.getArguments()[0];
+                List<Path> paths = Arrays.stream(file.listFiles()).map(f -> 
mkMockPath(f)).collect(toList());
+                return mkDirectoryStream(paths);
+            });
+            
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(String.class)))
+                    .thenCallRealMethod();
+
+            Map<String, Object> conf = Utils.readStormConfig();
+
+            LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner) 
{
+                @Override
+                SortedSet<String> getAliveWorkerDirs(File rootDir) throws 
Exception {
+                    return new 
TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1"));
+                }
+            };
+
+            long nowMillis = Time.currentTimeMillis();
+
+            List<File> files1 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("A" + idx + ".log")
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+            List<File> files2 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("B" + idx)
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+            List<File> files3 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("C" + idx)
+                    .setMtime(nowMillis + (100 * idx)).setLength(200).build())
+                    .collect(toList());
+
+            // note that port1Dir is active worker containing active logs
+            File port1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1")
+                    .setFiles(files1.toArray(new File[]{})).build();
+            File port2Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port2")
+                    .setFiles(files2.toArray(new File[]{})).build();
+            File port3Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo2/port3")
+                    .setFiles(files3.toArray(new File[]{})).build();
+
+            File[] topo1Files = new File[] { port1Dir, port2Dir };
+            File[] topo2Files = new File[] { port3Dir };
+            File topo1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1")
+                    .setFiles(topo1Files).build();
+            File topo2Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo2")
+                    .setFiles(topo2Files).build();
+
+            File[] rootFiles = new File[] { topo1Dir, topo2Dir };
+            File rootDir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts")
+                    .setFiles(rootFiles).build();
+
+            int deletedFiles = logCleaner.globalLogCleanup(rootDir, 2400, 
mockDirectoryCleaner);
+            assertEquals(18, deletedFiles);
+        } finally {
+            Utils.setInstance(prevUtils);
+        }
+    }
+
+    /**
+     * Build up workerid-workerlogdir map for the old workers' dirs.
+     */
+    @Test
+    public void testIdentifyWorkerLogDirs() throws Exception {
+        File port1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1").build();
+        File mockMetaFile = new 
MockFileBuilder().setFileName("worker.yaml").build();
+
+        String expId = "id12345";
+        Map<String, File> expected = Collections.singletonMap(expId, port1Dir);
+
+        try {
+            SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
+            SupervisorUtils.setInstance(mockedSupervisorUtils);
+
+            Map<String, Object> stormConf = Utils.readStormConfig();
+            LogCleaner logCleaner = new LogCleaner(stormConf, new 
DirectoryCleaner()) {
+                @Override
+                Optional<File> getMetadataFileForWorkerLogDir(File logDir) 
throws IOException {
+                    return Optional.of(mockMetaFile);
+                }
+
+                @Override
+                String getWorkerIdFromMetadataFile(String metaFile) {
+                    return expId;
+                }
+            };
+
+            
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(null);
+            assertEquals(expected, 
logCleaner.identifyWorkerLogDirs(Collections.singleton(port1Dir)));
+        } finally {
+            SupervisorUtils.resetInstance();
+        }
+    }
+
+    /**
+     * return directories for workers that are not alive.
+     */
+    @Test
+    public void testGetDeadWorkerDirs() throws Exception {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+        stormConf.put(SUPERVISOR_WORKER_TIMEOUT_SECS, 5);
+
+        LSWorkerHeartbeat hb = new LSWorkerHeartbeat();
+        hb.set_time_secs(1);
+
+        Map<String, LSWorkerHeartbeat> idToHb = Collections.singletonMap("42", 
hb);
+        int nowSecs = 2;
+        File unexpectedDir1 = new 
MockDirectoryBuilder().setDirName("dir1").build();
+        File expectedDir2 = new 
MockDirectoryBuilder().setDirName("dir2").build();
+        File expectedDir3 = new 
MockDirectoryBuilder().setDirName("dir3").build();
+        Set<File> logDirs = Sets.newSet(unexpectedDir1, expectedDir2, 
expectedDir3);
+
+        try {
+            SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
+            SupervisorUtils.setInstance(mockedSupervisorUtils);
+
+            LogCleaner logCleaner = new LogCleaner(stormConf, new 
DirectoryCleaner()) {
+                @Override
+                Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
+                    Map<String, File> ret = new HashMap<>();
+                    ret.put("42", unexpectedDir1);
+                    ret.put("007", expectedDir2);
+                    // this tests a directory with no yaml file thus no worker 
id
+                    ret.put("", expectedDir3);
+
+                    return ret;
+                }
+            };
+
+            
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(idToHb);
+            assertEquals(Sets.newSet(expectedDir2, expectedDir3), 
logCleaner.getDeadWorkerDirs(nowSecs, logDirs));
+        } finally {
+            SupervisorUtils.resetInstance();
+        }
+    }
+
+    /**
+     * cleanup function forceDeletes files of dead workers
+     */
+    @Test
+    public void testCleanupFn() throws IOException {
+        File mockFile1 = new 
MockFileBuilder().setFileName("delete-me1").build();
+        File mockFile2 = new 
MockFileBuilder().setFileName("delete-me2").build();
+
+        Utils prevUtils = null;
+        try {
+            Utils mockUtils = mock(Utils.class);
+            prevUtils = Utils.setInstance(mockUtils);
+
+            List<String> forceDeleteArgs = new ArrayList<>();
+            doAnswer(invocationOnMock -> {
+                String path = (String) invocationOnMock.getArguments()[0];
+                forceDeleteArgs.add(path);
+                return null;
+            }).when(mockUtils).forceDelete(anyString());
+
+            LogCleaner logCleaner = new LogCleaner(Utils.readStormConfig(), 
new DirectoryCleaner()) {
+                @Override
+                Set<File> selectDirsForCleanup(long nowMillis, String rootDir) 
{
+                    return Collections.emptySet();
+                }
+
+                @Override
+                SortedSet<File> getDeadWorkerDirs(int nowSecs, Set<File> 
logDirs) throws Exception {
+                    SortedSet<File> dirs = new TreeSet<>();
+                    dirs.add(mockFile1);
+                    dirs.add(mockFile2);
+                    return dirs;
+                }
+
+                @Override
+                void cleanupEmptyTopoDirectory(File dir) throws IOException {
+                }
+            };
+
+            logCleaner.run();
+
+            assertEquals(2, forceDeleteArgs.size());
+            assertEquals(mockFile1.getCanonicalPath(), forceDeleteArgs.get(0));
+            assertEquals(mockFile2.getCanonicalPath(), forceDeleteArgs.get(1));
+        } finally {
+            Utils.setInstance(prevUtils);
+        }
+    }
+
+    private Path mkMockPath(File file) {
+        Path mockPath = mock(Path.class);
+        when(mockPath.toFile()).thenReturn(file);
+        return mockPath;
+    }
+
+    private DirectoryStream<Path> mkDirectoryStream(List<Path> listOfPaths) {
+        return new DirectoryStream<Path>() {
+            @Override
+            public Iterator<Path> iterator() {
+                return listOfPaths.iterator();
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizerTest.java
new file mode 100644
index 0000000..3d0e43e
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+import org.apache.storm.daemon.logviewer.testsupport.ArgumentsVerifier;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.Config.NIMBUS_ADMINS;
+import static org.apache.storm.Config.TOPOLOGY_GROUPS;
+import static org.apache.storm.Config.TOPOLOGY_USERS;
+import static org.apache.storm.DaemonConfig.LOGS_USERS;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ResourceAuthorizerTest {
+
+    /**
+     * allow cluster admin.
+     */
+    @Test
+    public void testAuthorizedLogUserAllowClusterAdmin() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+        conf.put(NIMBUS_ADMINS, Collections.singletonList("alice"));
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.emptySet(), 
Collections.emptySet()))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.emptySet()).when(authorizer).getUserGroups(anyString());
+
+        assertTrue(authorizer.isAuthorizedLogUser("alice", "non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    /**
+     * ignore any cluster-set topology.users topology.groups.
+     */
+    @Test
+    public void 
testAuthorizedLogUserIgnoreAnyClusterSetTopologyUsersAndTopologyGroups() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+        conf.put(TOPOLOGY_USERS, Collections.singletonList("alice"));
+        conf.put(TOPOLOGY_GROUPS, Collections.singletonList("alice-group"));
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.emptySet(), 
Collections.emptySet()))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.singleton("alice-group")).when(authorizer).getUserGroups(anyString());
+
+        assertFalse(authorizer.isAuthorizedLogUser("alice", 
"non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    /**
+     * allow cluster logs user.
+     */
+    @Test
+    public void testAuthorizedLogUserAllowClusterLogsUser() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+        conf.put(LOGS_USERS, Collections.singletonList("alice"));
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.emptySet(), 
Collections.emptySet()))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.emptySet()).when(authorizer).getUserGroups(anyString());
+
+        assertTrue(authorizer.isAuthorizedLogUser("alice", "non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    /**
+     * allow whitelisted topology user.
+     */
+    @Test
+    public void testAuthorizedLogUserAllowWhitelistedTopologyUser() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.singleton("alice"), 
Collections.emptySet()))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.emptySet()).when(authorizer).getUserGroups(anyString());
+
+        assertTrue(authorizer.isAuthorizedLogUser("alice", "non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    /**
+     * allow whitelisted topology group.
+     */
+    @Test
+    public void testAuthorizedLogUserAllowWhitelistedTopologyGroup() {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.emptySet(), 
Collections.singleton("alice-group")))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.singleton("alice-group")).when(authorizer).getUserGroups(anyString());
+
+        assertTrue(authorizer.isAuthorizedLogUser("alice", "non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    /**
+     * disallow user not in nimbus admin, topo user, logs user, or whitelist.
+     */
+    @Test
+    public void 
testAuthorizedLogUserDisallowUserNotInNimbusAdminNorTopoUserNorLogsUserNotWhitelist()
 {
+        Map<String, Object> stormConf = Utils.readStormConfig();
+
+        Map<String, Object> conf = new HashMap<>(stormConf);
+
+        ResourceAuthorizer authorizer = spy(new ResourceAuthorizer(conf));
+
+        doReturn(new 
ResourceAuthorizer.LogUserGroupWhitelist(Collections.emptySet(), 
Collections.emptySet()))
+                .when(authorizer).getLogUserGroupWhitelist(anyString());
+
+        
doReturn(Collections.emptySet()).when(authorizer).getUserGroups(anyString());
+
+        assertFalse(authorizer.isAuthorizedLogUser("alice", 
"non-blank-fname"));
+
+        verifyStubMethodsAreCalledProperly(authorizer);
+    }
+
+    private void verifyStubMethodsAreCalledProperly(ResourceAuthorizer 
authorizer) {
+        ArgumentsVerifier.verifyFirstCallArgsForSingleArgMethod(
+                captor -> verify(authorizer, 
times(2)).getLogUserGroupWhitelist(captor.capture()),
+                String.class, "non-blank-fname");
+
+        ArgumentsVerifier.verifyFirstCallArgsForSingleArgMethod(
+                captor -> verify(authorizer).getUserGroups(captor.capture()),
+                String.class, "alice");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/resources/logviewer-search-context-tests.log.gz
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/resources/logviewer-search-context-tests.log.gz 
b/storm-webapp/src/test/resources/logviewer-search-context-tests.log.gz
new file mode 100644
index 0000000..5cf2a06
Binary files /dev/null and 
b/storm-webapp/src/test/resources/logviewer-search-context-tests.log.gz differ

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/resources/logviewer-search-context-tests.log.test
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/resources/logviewer-search-context-tests.log.test 
b/storm-webapp/src/test/resources/logviewer-search-context-tests.log.test
new file mode 100644
index 0000000..6e4d4af
--- /dev/null
+++ b/storm-webapp/src/test/resources/logviewer-search-context-tests.log.test
@@ -0,0 +1 @@
+needle 
needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle
 needle

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/resources/small-worker.log.test
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/resources/small-worker.log.test 
b/storm-webapp/src/test/resources/small-worker.log.test
new file mode 100644
index 0000000..27d61d1
--- /dev/null
+++ b/storm-webapp/src/test/resources/small-worker.log.test
@@ -0,0 +1 @@
+000000 needle 000000

http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/resources/test-3072.log.test
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/resources/test-3072.log.test 
b/storm-webapp/src/test/resources/test-3072.log.test
new file mode 100644
index 0000000..56dc6f1
--- /dev/null
+++ b/storm-webapp/src/test/resources/test-3072.log.test
@@ -0,0 +1,3 @@
+This is a test log file of size 3072.
+
+.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
 
.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
 
.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
 ....................................needle
\ No newline at end of file

Reply via email to