Repository: storm
Updated Branches:
  refs/heads/master 1622f19c0 -> 3c53dd516


STORM-1280 port backtype.storm.daemon.logviewer to java

* apply recent change from logviewer.clj (0201ae7)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb5c247f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb5c247f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb5c247f

Branch: refs/heads/master
Commit: bb5c247f9ba595ec0e9dc88284da568011520aff
Parents: 44b268b
Author: Jungtaek Lim <[email protected]>
Authored: Fri Jul 14 11:56:28 2017 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Fri Jul 14 12:11:41 2017 +0900

----------------------------------------------------------------------
 .../storm/daemon/logviewer/LogviewerServer.java |   8 +-
 .../handler/LogviewerLogDownloadHandler.java    |   6 +-
 .../handler/LogviewerLogPageHandler.java        |   7 +-
 .../daemon/logviewer/utils/LogCleaner.java      |  82 +++--------
 .../daemon/logviewer/utils/WorkerLogs.java      | 141 +++++++++++++++++--
 .../logviewer/webapp/LogviewerApplication.java  |   6 +-
 .../handler/LogviewerLogPageHandlerTest.java    |   6 +-
 .../daemon/logviewer/utils/LogCleanerTest.java  |  81 ++++-------
 .../daemon/logviewer/utils/WorkerLogsTest.java  |  75 ++++++++++
 9 files changed, 274 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 2b26702..1b53809 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -23,10 +23,12 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
 import org.apache.storm.daemon.logviewer.utils.LogCleaner;
+import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.ui.FilterConfiguration;
 import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Utils;
 import org.eclipse.jetty.server.Server;
@@ -38,6 +40,7 @@ import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -158,10 +161,13 @@ public class LogviewerServer implements AutoCloseable {
         Utils.setupDefaultUncaughtExceptionHandler();
         Map<String, Object> conf = Utils.readStormConfig();
 
+        String logRoot = ConfigUtils.workerArtifactsRoot(conf);
+        File logRootFile = new File(logRoot);
+        WorkerLogs workerLogs = new WorkerLogs(conf, logRootFile);
         DirectoryCleaner directoryCleaner = new DirectoryCleaner();
 
         try (LogviewerServer server = new LogviewerServer(conf);
-             LogCleaner logCleaner = new LogCleaner(conf, directoryCleaner)) {
+             LogCleaner logCleaner = new LogCleaner(conf, workerLogs, 
directoryCleaner, logRootFile)) {
             Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
             logCleaner.start();
             StormMetricsRegistry.startMetricsReporters(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
index b8f32f2..7529297 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
@@ -20,19 +20,23 @@ package org.apache.storm.daemon.logviewer.handler;
 
 import org.apache.storm.daemon.logviewer.utils.LogFileDownloader;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 
 public class LogviewerLogDownloadHandler {
 
+    private WorkerLogs workerLogs;
     private final LogFileDownloader logFileDownloadHelper;
 
-    public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, 
ResourceAuthorizer resourceAuthorizer) {
+    public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, 
WorkerLogs workerLogs, ResourceAuthorizer resourceAuthorizer) {
+        this.workerLogs = workerLogs;
         this.logFileDownloadHelper = new LogFileDownloader(logRoot, 
daemonLogRoot, resourceAuthorizer);
     }
 
     public Response downloadLogFile(String fileName, String user) throws 
IOException {
+        workerLogs.setLogFilePermission(fileName);
         return logFileDownloadHelper.downloadFile(fileName, user, false);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
index 0881b69..d2488f6 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
@@ -62,12 +62,15 @@ import static 
org.apache.commons.lang.StringEscapeUtils.escapeHtml;
 public class LogviewerLogPageHandler {
     private final String logRoot;
     private final String daemonLogRoot;
+    private final WorkerLogs workerLogs;
     private final ResourceAuthorizer resourceAuthorizer;
 
     public LogviewerLogPageHandler(String logRoot, String daemonLogRoot,
+                                   WorkerLogs workerLogs,
                                    ResourceAuthorizer resourceAuthorizer) {
         this.logRoot = logRoot;
         this.daemonLogRoot = daemonLogRoot;
+        this.workerLogs = workerLogs;
         this.resourceAuthorizer = resourceAuthorizer;
     }
 
@@ -75,7 +78,7 @@ public class LogviewerLogPageHandler {
         List<File> fileResults = null;
         if (topologyId == null) {
             if (port == null) {
-                fileResults = WorkerLogs.getAllLogsForRootDir(new 
File(logRoot));
+                fileResults = workerLogs.getAllLogsForRootDir();
             } else {
                 fileResults = new ArrayList<>();
 
@@ -130,6 +133,8 @@ public class LogviewerLogPageHandler {
     public Response logPage(String fileName, Integer start, Integer length, 
String grep, String user) throws IOException, InvalidRequestException {
         String rootDir = logRoot;
         if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) {
+            workerLogs.setLogFilePermission(fileName);
+
             File file = new File(rootDir, fileName).getCanonicalFile();
             String path = file.getCanonicalPath();
             boolean isZipFile = path.endsWith(".gz");

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
index cca2a52..6e4e073 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
@@ -22,12 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.apache.storm.StormTimer;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.jooq.lambda.Unchecked;
-import org.jooq.lambda.tuple.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,23 +60,23 @@ import static 
org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_M
 
 public class LogCleaner implements Runnable, Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);
-    public static final String WORKER_YAML = "worker.yaml";
 
     private final Map<String, Object> stormConf;
     private final Integer intervalSecs;
-    private final String logRootDir;
+    private final File logRootDir;
     private final DirectoryCleaner directoryCleaner;
+    private final WorkerLogs workerLogs;
 
     private StormTimer logviewerCleanupTimer;
     private final long maxSumWorkerLogsSizeMb;
     private long maxPerWorkerLogsSizeMb;
 
-    public LogCleaner(Map<String, Object> stormConf, DirectoryCleaner 
directoryCleaner) {
-        String logRootDir = ConfigUtils.workerArtifactsRoot(stormConf);
-
+    public LogCleaner(Map<String, Object> stormConf, WorkerLogs workerLogs, 
DirectoryCleaner directoryCleaner,
+                      File logRootDir) {
         this.stormConf = stormConf;
         this.intervalSecs = 
ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_INTERVAL_SECS), null);
         this.logRootDir = logRootDir;
+        this.workerLogs = workerLogs;
         this.directoryCleaner = directoryCleaner;
 
         maxSumWorkerLogsSizeMb = 
ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB));
@@ -121,7 +119,7 @@ public class LogCleaner implements Runnable, Closeable {
     public void run() {
         try {
             int nowSecs = Time.currentTimeSecs();
-            Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000, 
logRootDir);
+            Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000);
 
             SortedSet<File> deadWorkerDirs = getDeadWorkerDirs(nowSecs, 
oldLogDirs);
 
@@ -141,8 +139,8 @@ public class LogCleaner implements Runnable, Closeable {
                 }
             }));
 
-            perWorkerDirCleanup(new File(logRootDir), maxPerWorkerLogsSizeMb * 
1024 * 1024, directoryCleaner);
-            globalLogCleanup(new File(logRootDir), maxSumWorkerLogsSizeMb * 
1024 * 1024, directoryCleaner);
+            perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024);
+            globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024);
         } catch (Exception ex) {
             LOG.error("Exception while cleaning up old log.", ex);
         }
@@ -152,9 +150,9 @@ public class LogCleaner implements Runnable, Closeable {
      * Delete the oldest files in each overloaded worker log dir.
      */
     @VisibleForTesting
-    List<Integer> perWorkerDirCleanup(File rootDir, long size, 
DirectoryCleaner cleaner) {
-        return WorkerLogs.getAllWorkerDirs(rootDir).stream()
-                .map(Unchecked.function(dir -> 
cleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, 
null)))
+    List<Integer> perWorkerDirCleanup(long size) {
+        return workerLogs.getAllWorkerDirs().stream()
+                .map(Unchecked.function(dir -> 
directoryCleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), 
size, true, null)))
                 .collect(toList());
     }
 
@@ -162,53 +160,11 @@ public class LogCleaner implements Runnable, Closeable {
      * Delete the oldest files in overloaded worker-artifacts globally.
      */
     @VisibleForTesting
-    int globalLogCleanup(File rootDir, long size, DirectoryCleaner cleaner) 
throws Exception {
-        List<File> workerDirs = new 
ArrayList<>(WorkerLogs.getAllWorkerDirs(rootDir));
-        Set<String> aliveWorkerDirs = new 
HashSet<>(getAliveWorkerDirs(rootDir));
-
-        return cleaner.deleteOldestWhileTooLarge(workerDirs, size, false, 
aliveWorkerDirs);
-    }
-
-    /**
-     * Return a sorted set of java.io.Files that were written by workers that 
are now active.
-     */
-    @VisibleForTesting
-    SortedSet<String> getAliveWorkerDirs(File rootDir) throws Exception {
-        Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
-        Set<File> logDirs = WorkerLogs.getAllWorkerDirs(rootDir);
-        Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
-
-        return idToDir.entrySet().stream()
-                .filter(entry -> aliveIds.contains(entry.getKey()))
-                .map(Unchecked.function(entry -> 
entry.getValue().getCanonicalPath()))
-                .collect(toCollection(TreeSet::new));
-    }
+    int globalLogCleanup(long size) throws Exception {
+        List<File> workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs());
+        Set<String> aliveWorkerDirs = new 
HashSet<>(workerLogs.getAliveWorkerDirs());
 
-    @VisibleForTesting
-    Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
-        return logDirs.stream().map(Unchecked.function(logDir -> {
-            Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir);
-
-            return metaFile.map(Unchecked.function(m -> new 
Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir)))
-                    .orElse(new Tuple2<>("", logDir));
-        })).collect(toMap(Tuple2::v1, Tuple2::v2));
-    }
-
-    @VisibleForTesting
-    Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws 
IOException {
-        File metaFile = new File(logDir, WORKER_YAML);
-        if (metaFile.exists()) {
-            return Optional.of(metaFile);
-        } else {
-            LOG.warn("Could not find {} to clean up for {}", 
metaFile.getCanonicalPath(), logDir);
-            return Optional.empty();
-        }
-    }
-
-    @VisibleForTesting
-    String getWorkerIdFromMetadataFile(String metaFile) {
-        Map<String, Object> map = (Map<String, Object>)  
Utils.readYamlFile(metaFile);
-        return ObjectReader.getString(map.get("worker-id"), null);
+        return directoryCleaner.deleteOldestWhileTooLarge(workerDirs, size, 
false, aliveWorkerDirs);
     }
 
     private Set<String> getAliveIds(int nowSecs) throws Exception {
@@ -238,8 +194,8 @@ public class LogCleaner implements Runnable, Closeable {
         if (logDirs.isEmpty()) {
             return new TreeSet<>();
         } else {
-            Set<String> aliveIds = getAliveIds(nowSecs);
-            Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
+            Set<String> aliveIds = workerLogs.getAliveIds(nowSecs);
+            Map<String, File> idToDir = 
workerLogs.identifyWorkerLogDirs(logDirs);
 
             return idToDir.entrySet().stream()
                     .filter(entry -> !aliveIds.contains(entry.getKey()))
@@ -249,10 +205,10 @@ public class LogCleaner implements Runnable, Closeable {
     }
 
     @VisibleForTesting
-    Set<File> selectDirsForCleanup(long nowMillis, String rootDir) {
+    Set<File> selectDirsForCleanup(long nowMillis) {
         FileFilter fileFilter = mkFileFilterForLogCleanup(nowMillis);
 
-        return Arrays.stream(new File(rootDir).listFiles())
+        return Arrays.stream(logRootDir.listFiles())
                 .flatMap(topoDir -> 
Arrays.stream(topoDir.listFiles(fileFilter)))
                 .collect(toCollection(TreeSet::new));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
index 1d70b31..6018b79 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
@@ -1,41 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.storm.daemon.logviewer.utils;
 
+import com.google.common.collect.Lists;
+import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
+import org.jooq.lambda.Unchecked;
+import org.jooq.lambda.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.storm.Config.SUPERVISOR_RUN_WORKER_AS_USER;
+import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER;
 import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast;
 
 public class WorkerLogs {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);
+
+    public static final String WORKER_YAML = "worker.yaml";
+    private final Map<String, Object> stormConf;
+    private final File logRootDir;
 
-    private WorkerLogs() {
+    public WorkerLogs(Map<String, Object> stormConf, File logRootDir) {
+        this.stormConf = stormConf;
+        this.logRootDir = logRootDir;
     }
 
-    /**
-     * Return the path of the worker log with the format of 
topoId/port/worker.log.*
-     */
-    public static String getTopologyPortWorkerLog(File file) {
-        try {
-            String[] splitted = 
file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR);
-            List<String> split = takeLast(Arrays.asList(splitted), 3);
+    public void setLogFilePermission(String fileName) throws IOException {
+        File file = new File(logRootDir, fileName).getCanonicalFile();
+        boolean runAsUser = 
ObjectReader.getBoolean(stormConf.get(SUPERVISOR_RUN_WORKER_AS_USER), false);
+        File parent = new File(logRootDir, fileName).getParentFile();
+        Optional<File> mdFile = (parent == null) ? Optional.empty() : 
getMetadataFileForWorkerLogDir(parent);
+        Optional<String> topoOwner = mdFile.isPresent() ?
+                
Optional.of(getTopologyOwnerFromMetadataFile(mdFile.get().getCanonicalPath())) :
+                Optional.empty();
 
-            return String.join(Utils.FILE_PATH_SEPARATOR, split);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        if (runAsUser && topoOwner.isPresent() && file.exists() && 
!Files.isReadable(file.toPath())) {
+            LOG.debug("Setting permissions on file {} with topo-owner {}", 
fileName, topoOwner);
+            ClientSupervisorUtils.processLauncherAndWait(stormConf, 
topoOwner.get(),
+                    Lists.newArrayList("blob", file.getCanonicalPath()), null,
+                    "setup group read permissions for file: " + fileName);
         }
     }
 
-    public static List<File> getAllLogsForRootDir(File logDir) throws 
IOException {
+    public List<File> getAllLogsForRootDir() throws IOException {
         List<File> files = new ArrayList<>();
-        Set<File> topoDirFiles = getAllWorkerDirs(logDir);
+        Set<File> topoDirFiles = getAllWorkerDirs();
         if (topoDirFiles != null) {
             for (File portDir : topoDirFiles) {
                 files.addAll(DirectoryCleaner.getFilesForDir(portDir));
@@ -45,8 +90,8 @@ public class WorkerLogs {
         return files;
     }
 
-    public static Set<File> getAllWorkerDirs(File rootDir) {
-        File[] rootDirFiles = rootDir.listFiles();
+    public Set<File> getAllWorkerDirs() {
+        File[] rootDirFiles = logRootDir.listFiles();
         if (rootDirFiles != null) {
             return Arrays.stream(rootDirFiles).flatMap(topoDir -> {
                 File[] topoFiles = topoDir.listFiles();
@@ -57,4 +102,70 @@ public class WorkerLogs {
         return new TreeSet<>();
     }
 
+    /**
+     * Return a sorted set of java.io.Files that were written by workers that 
are now active.
+     */
+    public SortedSet<String> getAliveWorkerDirs() throws Exception {
+        Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
+        Set<File> logDirs = getAllWorkerDirs();
+        Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
+
+        return idToDir.entrySet().stream()
+                .filter(entry -> aliveIds.contains(entry.getKey()))
+                .map(Unchecked.function(entry -> 
entry.getValue().getCanonicalPath()))
+                .collect(toCollection(TreeSet::new));
+    }
+
+    public Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws 
IOException {
+        File metaFile = new File(logDir, WORKER_YAML);
+        if (metaFile.exists()) {
+            return Optional.of(metaFile);
+        } else {
+            LOG.warn("Could not find {} to clean up for {}", 
metaFile.getCanonicalPath(), logDir);
+            return Optional.empty();
+        }
+    }
+
+    public String getWorkerIdFromMetadataFile(String metaFile) {
+        Map<String, Object> map = (Map<String, Object>) 
Utils.readYamlFile(metaFile);
+        return ObjectReader.getString(map.get("worker-id"), null);
+    }
+
+    public String getTopologyOwnerFromMetadataFile(String metaFile) {
+        Map<String, Object> map = (Map<String, Object>) 
Utils.readYamlFile(metaFile);
+        return ObjectReader.getString(map.get(TOPOLOGY_SUBMITTER_USER), null);
+    }
+
+    public Set<String> getAliveIds(int nowSecs) throws Exception {
+        return 
SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream()
+                .filter(entry -> Objects.nonNull(entry.getValue())
+                        && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, 
entry.getValue(), stormConf))
+                .map(Map.Entry::getKey)
+                .collect(toCollection(TreeSet::new));
+    }
+
+    public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
+        // we could also make this static, but not to do it due to mock
+        return logDirs.stream().map(Unchecked.function(logDir -> {
+            Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir);
+
+            return metaFile.map(Unchecked.function(m -> new 
Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir)))
+                    .orElse(new Tuple2<>("", logDir));
+        })).collect(toMap(Tuple2::v1, Tuple2::v2));
+    }
+
+    /**
+     * Return the path of the worker log with the format of 
topoId/port/worker.log.*
+     */
+    public static String getTopologyPortWorkerLog(File file) {
+        try {
+            String[] splitted = 
file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR);
+            List<String> split = takeLast(Arrays.asList(splitted), 3);
+
+            return String.join(Utils.FILE_PATH_SEPARATOR, split);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java
index cbf478c..32a723e 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java
@@ -28,6 +28,7 @@ import 
org.apache.storm.daemon.logviewer.handler.LogviewerProfileHandler;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
 import org.apache.storm.daemon.logviewer.handler.LogviewerLogDownloadHandler;
 import org.apache.storm.daemon.logviewer.handler.LogviewerLogPageHandler;
+import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IHttpCredentialsPlugin;
 import org.apache.storm.utils.ConfigUtils;
@@ -55,11 +56,12 @@ public class LogviewerApplication extends Application {
         String daemonLogRoot = 
logRootDir(ObjectReader.getString(stormConf.get(LOGVIEWER_APPENDER_NAME)));
 
         ResourceAuthorizer resourceAuthorizer = new 
ResourceAuthorizer(stormConf);
+        WorkerLogs workerLogs = new WorkerLogs(stormConf, new File(logRoot));
 
-        LogviewerLogPageHandler logviewer = new 
LogviewerLogPageHandler(logRoot, daemonLogRoot, resourceAuthorizer);
+        LogviewerLogPageHandler logviewer = new 
LogviewerLogPageHandler(logRoot, daemonLogRoot, workerLogs, resourceAuthorizer);
         LogviewerProfileHandler profileHandler = new 
LogviewerProfileHandler(logRoot, resourceAuthorizer);
         LogviewerLogDownloadHandler logDownloadHandler = new 
LogviewerLogDownloadHandler(logRoot, daemonLogRoot,
-                resourceAuthorizer);
+                workerLogs, resourceAuthorizer);
         LogviewerLogSearchHandler logSearchHandler = new 
LogviewerLogSearchHandler(stormConf, logRoot, daemonLogRoot,
                 resourceAuthorizer);
         IHttpCredentialsPlugin httpCredsHandler = 
AuthUtils.GetUiHttpCredentialsPlugin(stormConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java
index 4b76abd..2c05972 100644
--- 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.storm.daemon.logviewer.handler;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 import org.apache.storm.utils.Utils;
 import org.assertj.core.util.Lists;
 import org.junit.Test;
@@ -31,6 +32,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.FileAttribute;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -56,9 +58,9 @@ public class LogviewerLogPageHandlerTest {
         file3.createNewFile();
 
         String origin = "www.origin.server.net";
-
+        Map<String, Object> stormConf = Utils.readStormConfig();
         LogviewerLogPageHandler handler = new 
LogviewerLogPageHandler(rootPath, null,
-                new ResourceAuthorizer(Utils.readStormConfig()));
+                new WorkerLogs(stormConf, new File(rootPath)), new 
ResourceAuthorizer(stormConf));
 
         Response expectedAll = 
LogviewerResponseBuilder.buildSuccessJsonResponse(
                 Lists.newArrayList("topoA/port1/worker.log", 
"topoA/port2/worker.log", "topoB/port1/worker.log"),

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
index 0c7d69c..21f4e9e 100644
--- 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
@@ -80,10 +80,12 @@ public class LogCleanerTest {
         conf.put(LOGVIEWER_CLEANUP_AGE_MINS, 60);
         conf.put(LOGVIEWER_CLEANUP_INTERVAL_SECS, 300);
 
-        LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner);
+        WorkerLogs workerLogs = new WorkerLogs(conf, null);
+
+        LogCleaner logCleaner = new LogCleaner(conf, workerLogs, 
mockDirectoryCleaner, null);
 
         long nowMillis = Time.currentTimeMillis();
-        long cutoffMillis = new LogCleaner(conf, 
mockDirectoryCleaner).cleanupCutoffAgeMillis(nowMillis);
+        long cutoffMillis = logCleaner.cleanupCutoffAgeMillis(nowMillis);
         long oldMtimeMillis = cutoffMillis - 500;
         long newMtimeMillis = cutoffMillis + 500;
 
@@ -125,9 +127,6 @@ public class LogCleanerTest {
             
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(String.class)))
                     .thenCallRealMethod();
 
-            Map<String, Object> conf = Utils.readStormConfig();
-            LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner);
-
             long nowMillis = Time.currentTimeMillis();
 
             List<File> files1 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("A" + idx)
@@ -157,7 +156,11 @@ public class LogCleanerTest {
             File rootDir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts")
                     .setFiles(rootFiles).build();
 
-            List<Integer> deletedFiles = 
logCleaner.perWorkerDirCleanup(rootDir, 1200, mockDirectoryCleaner);
+            Map<String, Object> conf = Utils.readStormConfig();
+            WorkerLogs workerLogs = new WorkerLogs(conf, rootDir);
+            LogCleaner logCleaner = new LogCleaner(conf, workerLogs, 
mockDirectoryCleaner, rootDir);
+
+            List<Integer> deletedFiles = logCleaner.perWorkerDirCleanup(1200);
             assertEquals(Integer.valueOf(4), deletedFiles.get(0));
             assertEquals(Integer.valueOf(4), deletedFiles.get(1));
             assertEquals(Integer.valueOf(4), 
deletedFiles.get(deletedFiles.size() - 1));
@@ -182,15 +185,6 @@ public class LogCleanerTest {
             
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(String.class)))
                     .thenCallRealMethod();
 
-            Map<String, Object> conf = Utils.readStormConfig();
-
-            LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner) 
{
-                @Override
-                SortedSet<String> getAliveWorkerDirs(File rootDir) throws 
Exception {
-                    return new 
TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1"));
-                }
-            };
-
             long nowMillis = Time.currentTimeMillis();
 
             List<File> files1 = Seq.range(0, 10).map(idx -> new 
MockFileBuilder().setFileName("A" + idx + ".log")
@@ -222,45 +216,19 @@ public class LogCleanerTest {
             File rootDir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts")
                     .setFiles(rootFiles).build();
 
-            int deletedFiles = logCleaner.globalLogCleanup(rootDir, 2400, 
mockDirectoryCleaner);
-            assertEquals(18, deletedFiles);
-        } finally {
-            Utils.setInstance(prevUtils);
-        }
-    }
-
-    /**
-     * Build up workerid-workerlogdir map for the old workers' dirs.
-     */
-    @Test
-    public void testIdentifyWorkerLogDirs() throws Exception {
-        File port1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1").build();
-        File mockMetaFile = new 
MockFileBuilder().setFileName("worker.yaml").build();
-
-        String expId = "id12345";
-        Map<String, File> expected = Collections.singletonMap(expId, port1Dir);
-
-        try {
-            SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
-            SupervisorUtils.setInstance(mockedSupervisorUtils);
-
-            Map<String, Object> stormConf = Utils.readStormConfig();
-            LogCleaner logCleaner = new LogCleaner(stormConf, new 
DirectoryCleaner()) {
-                @Override
-                Optional<File> getMetadataFileForWorkerLogDir(File logDir) 
throws IOException {
-                    return Optional.of(mockMetaFile);
-                }
-
+            Map<String, Object> conf = Utils.readStormConfig();
+            WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir) {
                 @Override
-                String getWorkerIdFromMetadataFile(String metaFile) {
-                    return expId;
+                public SortedSet<String> getAliveWorkerDirs() throws Exception 
{
+                    return new 
TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1"));
                 }
             };
 
-            
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(null);
-            assertEquals(expected, 
logCleaner.identifyWorkerLogDirs(Collections.singleton(port1Dir)));
+            LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, 
mockDirectoryCleaner, rootDir);
+            int deletedFiles = logCleaner.globalLogCleanup(2400);
+            assertEquals(18, deletedFiles);
         } finally {
-            SupervisorUtils.resetInstance();
+            Utils.setInstance(prevUtils);
         }
     }
 
@@ -286,9 +254,10 @@ public class LogCleanerTest {
             SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
             SupervisorUtils.setInstance(mockedSupervisorUtils);
 
-            LogCleaner logCleaner = new LogCleaner(stormConf, new 
DirectoryCleaner()) {
+            Map<String, Object> conf = Utils.readStormConfig();
+            WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null) {
                 @Override
-                Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
+                public Map<String, File> identifyWorkerLogDirs(Set<File> 
logDirs) {
                     Map<String, File> ret = new HashMap<>();
                     ret.put("42", unexpectedDir1);
                     ret.put("007", expectedDir2);
@@ -299,6 +268,8 @@ public class LogCleanerTest {
                 }
             };
 
+            LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, 
new DirectoryCleaner(), null);
+
             
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(idToHb);
             assertEquals(Sets.newSet(expectedDir2, expectedDir3), 
logCleaner.getDeadWorkerDirs(nowSecs, logDirs));
         } finally {
@@ -326,9 +297,13 @@ public class LogCleanerTest {
                 return null;
             }).when(mockUtils).forceDelete(anyString());
 
-            LogCleaner logCleaner = new LogCleaner(Utils.readStormConfig(), 
new DirectoryCleaner()) {
+
+            Map<String, Object> conf = Utils.readStormConfig();
+            WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null);
+
+            LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, 
new DirectoryCleaner(), null) {
                 @Override
-                Set<File> selectDirsForCleanup(long nowMillis, String rootDir) 
{
+                Set<File> selectDirsForCleanup(long nowMillis) {
                     return Collections.emptySet();
                 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
new file mode 100644
index 0000000..b8b5d8c
--- /dev/null
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder;
+import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WorkerLogsTest {
+
+    /**
+     * Build up workerid-workerlogdir map for the old workers' dirs.
+     */
+    @Test
+    public void testIdentifyWorkerLogDirs() throws Exception {
+        File port1Dir = new 
MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1").build();
+        File mockMetaFile = new 
MockFileBuilder().setFileName("worker.yaml").build();
+
+        String expId = "id12345";
+        Map<String, File> expected = Collections.singletonMap(expId, port1Dir);
+
+        try {
+            SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
+            SupervisorUtils.setInstance(mockedSupervisorUtils);
+
+            Map<String, Object> stormConf = Utils.readStormConfig();
+            WorkerLogs workerLogs = new WorkerLogs(stormConf, port1Dir) {
+                @Override
+                public Optional<File> getMetadataFileForWorkerLogDir(File 
logDir) throws IOException {
+                    return Optional.of(mockMetaFile);
+                }
+
+                @Override
+                public String getWorkerIdFromMetadataFile(String metaFile) {
+                    return expId;
+                }
+            };
+
+            
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(null);
+            assertEquals(expected, 
workerLogs.identifyWorkerLogDirs(Collections.singleton(port1Dir)));
+        } finally {
+            SupervisorUtils.resetInstance();
+        }
+    }
+
+}

Reply via email to