Repository: storm Updated Branches: refs/heads/master 1622f19c0 -> 3c53dd516
STORM-1280 port backtype.storm.daemon.logviewer to java * apply recent change from logviewer.clj (0201ae7) Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb5c247f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb5c247f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb5c247f Branch: refs/heads/master Commit: bb5c247f9ba595ec0e9dc88284da568011520aff Parents: 44b268b Author: Jungtaek Lim <[email protected]> Authored: Fri Jul 14 11:56:28 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Jul 14 12:11:41 2017 +0900 ---------------------------------------------------------------------- .../storm/daemon/logviewer/LogviewerServer.java | 8 +- .../handler/LogviewerLogDownloadHandler.java | 6 +- .../handler/LogviewerLogPageHandler.java | 7 +- .../daemon/logviewer/utils/LogCleaner.java | 82 +++-------- .../daemon/logviewer/utils/WorkerLogs.java | 141 +++++++++++++++++-- .../logviewer/webapp/LogviewerApplication.java | 6 +- .../handler/LogviewerLogPageHandlerTest.java | 6 +- .../daemon/logviewer/utils/LogCleanerTest.java | 81 ++++------- .../daemon/logviewer/utils/WorkerLogsTest.java | 75 ++++++++++ 9 files changed, 274 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java index 2b26702..1b53809 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java @@ -23,10 +23,12 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; import org.apache.storm.daemon.logviewer.utils.LogCleaner; +import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.ui.FilterConfiguration; import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Utils; import org.eclipse.jetty.server.Server; @@ -38,6 +40,7 @@ import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -158,10 +161,13 @@ public class LogviewerServer implements AutoCloseable { Utils.setupDefaultUncaughtExceptionHandler(); Map<String, Object> conf = Utils.readStormConfig(); + String logRoot = ConfigUtils.workerArtifactsRoot(conf); + File logRootFile = new File(logRoot); + WorkerLogs workerLogs = new WorkerLogs(conf, logRootFile); DirectoryCleaner directoryCleaner = new DirectoryCleaner(); try (LogviewerServer server = new LogviewerServer(conf); - LogCleaner logCleaner = new LogCleaner(conf, directoryCleaner)) { + LogCleaner logCleaner = new LogCleaner(conf, workerLogs, directoryCleaner, logRootFile)) { Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); logCleaner.start(); StormMetricsRegistry.startMetricsReporters(conf); http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java index b8f32f2..7529297 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java @@ -20,19 +20,23 @@ package org.apache.storm.daemon.logviewer.handler; import org.apache.storm.daemon.logviewer.utils.LogFileDownloader; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import javax.ws.rs.core.Response; import java.io.IOException; public class LogviewerLogDownloadHandler { + private WorkerLogs workerLogs; private final LogFileDownloader logFileDownloadHelper; - public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { + public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, WorkerLogs workerLogs, ResourceAuthorizer resourceAuthorizer) { + this.workerLogs = workerLogs; this.logFileDownloadHelper = new LogFileDownloader(logRoot, daemonLogRoot, resourceAuthorizer); } public Response downloadLogFile(String fileName, String user) throws IOException { + workerLogs.setLogFilePermission(fileName); return logFileDownloadHelper.downloadFile(fileName, user, false); } http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java index 0881b69..d2488f6 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java @@ -62,12 +62,15 @@ import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; public class LogviewerLogPageHandler { private final String logRoot; private final String daemonLogRoot; + private final WorkerLogs workerLogs; private final ResourceAuthorizer resourceAuthorizer; public LogviewerLogPageHandler(String logRoot, String daemonLogRoot, + WorkerLogs workerLogs, ResourceAuthorizer resourceAuthorizer) { this.logRoot = logRoot; this.daemonLogRoot = daemonLogRoot; + this.workerLogs = workerLogs; this.resourceAuthorizer = resourceAuthorizer; } @@ -75,7 +78,7 @@ public class LogviewerLogPageHandler { List<File> fileResults = null; if (topologyId == null) { if (port == null) { - fileResults = WorkerLogs.getAllLogsForRootDir(new File(logRoot)); + fileResults = workerLogs.getAllLogsForRootDir(); } else { fileResults = new ArrayList<>(); @@ -130,6 +133,8 @@ public class LogviewerLogPageHandler { public Response logPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException { String rootDir = logRoot; if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { + workerLogs.setLogFilePermission(fileName); + File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); boolean isZipFile = path.endsWith(".gz"); http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java index cca2a52..6e4e073 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java @@ -22,12 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.apache.storm.StormTimer; import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.jooq.lambda.Unchecked; -import org.jooq.lambda.tuple.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,23 +60,23 @@ import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_M public class LogCleaner implements Runnable, Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); - public static final String WORKER_YAML = "worker.yaml"; private final Map<String, Object> stormConf; private final Integer intervalSecs; - private final String logRootDir; + private final File logRootDir; private final DirectoryCleaner directoryCleaner; + private final WorkerLogs workerLogs; private StormTimer logviewerCleanupTimer; private final long maxSumWorkerLogsSizeMb; private long maxPerWorkerLogsSizeMb; - public LogCleaner(Map<String, Object> stormConf, DirectoryCleaner directoryCleaner) { - String logRootDir = ConfigUtils.workerArtifactsRoot(stormConf); - + public LogCleaner(Map<String, Object> stormConf, WorkerLogs workerLogs, DirectoryCleaner directoryCleaner, + File logRootDir) { this.stormConf = stormConf; this.intervalSecs = ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_INTERVAL_SECS), null); this.logRootDir = logRootDir; + this.workerLogs = workerLogs; this.directoryCleaner = directoryCleaner; maxSumWorkerLogsSizeMb = ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB)); @@ -121,7 +119,7 @@ public class LogCleaner implements Runnable, Closeable { public void run() { try { int nowSecs = Time.currentTimeSecs(); - Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000, logRootDir); + Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000); SortedSet<File> deadWorkerDirs = getDeadWorkerDirs(nowSecs, oldLogDirs); @@ -141,8 +139,8 @@ public class LogCleaner implements Runnable, Closeable { } })); - perWorkerDirCleanup(new File(logRootDir), maxPerWorkerLogsSizeMb * 1024 * 1024, directoryCleaner); - globalLogCleanup(new File(logRootDir), maxSumWorkerLogsSizeMb * 1024 * 1024, directoryCleaner); + perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024); + globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024); } catch (Exception ex) { LOG.error("Exception while cleaning up old log.", ex); } @@ -152,9 +150,9 @@ public class LogCleaner implements Runnable, Closeable { * Delete the oldest files in each overloaded worker log dir. */ @VisibleForTesting - List<Integer> perWorkerDirCleanup(File rootDir, long size, DirectoryCleaner cleaner) { - return WorkerLogs.getAllWorkerDirs(rootDir).stream() - .map(Unchecked.function(dir -> cleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null))) + List<Integer> perWorkerDirCleanup(long size) { + return workerLogs.getAllWorkerDirs().stream() + .map(Unchecked.function(dir -> directoryCleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null))) .collect(toList()); } @@ -162,53 +160,11 @@ public class LogCleaner implements Runnable, Closeable { * Delete the oldest files in overloaded worker-artifacts globally. */ @VisibleForTesting - int globalLogCleanup(File rootDir, long size, DirectoryCleaner cleaner) throws Exception { - List<File> workerDirs = new ArrayList<>(WorkerLogs.getAllWorkerDirs(rootDir)); - Set<String> aliveWorkerDirs = new HashSet<>(getAliveWorkerDirs(rootDir)); - - return cleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); - } - - /** - * Return a sorted set of java.io.Files that were written by workers that are now active. - */ - @VisibleForTesting - SortedSet<String> getAliveWorkerDirs(File rootDir) throws Exception { - Set<String> aliveIds = getAliveIds(Time.currentTimeSecs()); - Set<File> logDirs = WorkerLogs.getAllWorkerDirs(rootDir); - Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> aliveIds.contains(entry.getKey())) - .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) - .collect(toCollection(TreeSet::new)); - } + int globalLogCleanup(long size) throws Exception { + List<File> workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs()); + Set<String> aliveWorkerDirs = new HashSet<>(workerLogs.getAliveWorkerDirs()); - @VisibleForTesting - Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { - return logDirs.stream().map(Unchecked.function(logDir -> { - Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); - - return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) - .orElse(new Tuple2<>("", logDir)); - })).collect(toMap(Tuple2::v1, Tuple2::v2)); - } - - @VisibleForTesting - Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { - File metaFile = new File(logDir, WORKER_YAML); - if (metaFile.exists()) { - return Optional.of(metaFile); - } else { - LOG.warn("Could not find {} to clean up for {}", metaFile.getCanonicalPath(), logDir); - return Optional.empty(); - } - } - - @VisibleForTesting - String getWorkerIdFromMetadataFile(String metaFile) { - Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); - return ObjectReader.getString(map.get("worker-id"), null); + return directoryCleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); } private Set<String> getAliveIds(int nowSecs) throws Exception { @@ -238,8 +194,8 @@ public class LogCleaner implements Runnable, Closeable { if (logDirs.isEmpty()) { return new TreeSet<>(); } else { - Set<String> aliveIds = getAliveIds(nowSecs); - Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); + Set<String> aliveIds = workerLogs.getAliveIds(nowSecs); + Map<String, File> idToDir = workerLogs.identifyWorkerLogDirs(logDirs); return idToDir.entrySet().stream() .filter(entry -> !aliveIds.contains(entry.getKey())) @@ -249,10 +205,10 @@ public class LogCleaner implements Runnable, Closeable { } @VisibleForTesting - Set<File> selectDirsForCleanup(long nowMillis, String rootDir) { + Set<File> selectDirsForCleanup(long nowMillis) { FileFilter fileFilter = mkFileFilterForLogCleanup(nowMillis); - return Arrays.stream(new File(rootDir).listFiles()) + return Arrays.stream(logRootDir.listFiles()) .flatMap(topoDir -> Arrays.stream(topoDir.listFiles(fileFilter))) .collect(toCollection(TreeSet::new)); } http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index 1d70b31..6018b79 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -1,41 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.storm.daemon.logviewer.utils; +import com.google.common.collect.Lists; +import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; +import org.jooq.lambda.Unchecked; +import org.jooq.lambda.tuple.Tuple2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Stream; import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toMap; +import static org.apache.storm.Config.SUPERVISOR_RUN_WORKER_AS_USER; +import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER; import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; public class WorkerLogs { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + + public static final String WORKER_YAML = "worker.yaml"; + private final Map<String, Object> stormConf; + private final File logRootDir; - private WorkerLogs() { + public WorkerLogs(Map<String, Object> stormConf, File logRootDir) { + this.stormConf = stormConf; + this.logRootDir = logRootDir; } - /** - * Return the path of the worker log with the format of topoId/port/worker.log.* - */ - public static String getTopologyPortWorkerLog(File file) { - try { - String[] splitted = file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR); - List<String> split = takeLast(Arrays.asList(splitted), 3); + public void setLogFilePermission(String fileName) throws IOException { + File file = new File(logRootDir, fileName).getCanonicalFile(); + boolean runAsUser = ObjectReader.getBoolean(stormConf.get(SUPERVISOR_RUN_WORKER_AS_USER), false); + File parent = new File(logRootDir, fileName).getParentFile(); + Optional<File> mdFile = (parent == null) ? Optional.empty() : getMetadataFileForWorkerLogDir(parent); + Optional<String> topoOwner = mdFile.isPresent() ? + Optional.of(getTopologyOwnerFromMetadataFile(mdFile.get().getCanonicalPath())) : + Optional.empty(); - return String.join(Utils.FILE_PATH_SEPARATOR, split); - } catch (IOException e) { - throw new RuntimeException(e); + if (runAsUser && topoOwner.isPresent() && file.exists() && !Files.isReadable(file.toPath())) { + LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner); + ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), + Lists.newArrayList("blob", file.getCanonicalPath()), null, + "setup group read permissions for file: " + fileName); } } - public static List<File> getAllLogsForRootDir(File logDir) throws IOException { + public List<File> getAllLogsForRootDir() throws IOException { List<File> files = new ArrayList<>(); - Set<File> topoDirFiles = getAllWorkerDirs(logDir); + Set<File> topoDirFiles = getAllWorkerDirs(); if (topoDirFiles != null) { for (File portDir : topoDirFiles) { files.addAll(DirectoryCleaner.getFilesForDir(portDir)); @@ -45,8 +90,8 @@ public class WorkerLogs { return files; } - public static Set<File> getAllWorkerDirs(File rootDir) { - File[] rootDirFiles = rootDir.listFiles(); + public Set<File> getAllWorkerDirs() { + File[] rootDirFiles = logRootDir.listFiles(); if (rootDirFiles != null) { return Arrays.stream(rootDirFiles).flatMap(topoDir -> { File[] topoFiles = topoDir.listFiles(); @@ -57,4 +102,70 @@ public class WorkerLogs { return new TreeSet<>(); } + /** + * Return a sorted set of java.io.Files that were written by workers that are now active. + */ + public SortedSet<String> getAliveWorkerDirs() throws Exception { + Set<String> aliveIds = getAliveIds(Time.currentTimeSecs()); + Set<File> logDirs = getAllWorkerDirs(); + Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); + + return idToDir.entrySet().stream() + .filter(entry -> aliveIds.contains(entry.getKey())) + .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) + .collect(toCollection(TreeSet::new)); + } + + public Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { + File metaFile = new File(logDir, WORKER_YAML); + if (metaFile.exists()) { + return Optional.of(metaFile); + } else { + LOG.warn("Could not find {} to clean up for {}", metaFile.getCanonicalPath(), logDir); + return Optional.empty(); + } + } + + public String getWorkerIdFromMetadataFile(String metaFile) { + Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); + return ObjectReader.getString(map.get("worker-id"), null); + } + + public String getTopologyOwnerFromMetadataFile(String metaFile) { + Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); + return ObjectReader.getString(map.get(TOPOLOGY_SUBMITTER_USER), null); + } + + public Set<String> getAliveIds(int nowSecs) throws Exception { + return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() + .filter(entry -> Objects.nonNull(entry.getValue()) + && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf)) + .map(Map.Entry::getKey) + .collect(toCollection(TreeSet::new)); + } + + public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { + // we could also make this static, but not to do it due to mock + return logDirs.stream().map(Unchecked.function(logDir -> { + Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); + + return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) + .orElse(new Tuple2<>("", logDir)); + })).collect(toMap(Tuple2::v1, Tuple2::v2)); + } + + /** + * Return the path of the worker log with the format of topoId/port/worker.log.* + */ + public static String getTopologyPortWorkerLog(File file) { + try { + String[] splitted = file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR); + List<String> split = takeLast(Arrays.asList(splitted), 3); + + return String.join(Utils.FILE_PATH_SEPARATOR, split); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java index cbf478c..32a723e 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerApplication.java @@ -28,6 +28,7 @@ import org.apache.storm.daemon.logviewer.handler.LogviewerProfileHandler; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; import org.apache.storm.daemon.logviewer.handler.LogviewerLogDownloadHandler; import org.apache.storm.daemon.logviewer.handler.LogviewerLogPageHandler; +import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.security.auth.IHttpCredentialsPlugin; import org.apache.storm.utils.ConfigUtils; @@ -55,11 +56,12 @@ public class LogviewerApplication extends Application { String daemonLogRoot = logRootDir(ObjectReader.getString(stormConf.get(LOGVIEWER_APPENDER_NAME))); ResourceAuthorizer resourceAuthorizer = new ResourceAuthorizer(stormConf); + WorkerLogs workerLogs = new WorkerLogs(stormConf, new File(logRoot)); - LogviewerLogPageHandler logviewer = new LogviewerLogPageHandler(logRoot, daemonLogRoot, resourceAuthorizer); + LogviewerLogPageHandler logviewer = new LogviewerLogPageHandler(logRoot, daemonLogRoot, workerLogs, resourceAuthorizer); LogviewerProfileHandler profileHandler = new LogviewerProfileHandler(logRoot, resourceAuthorizer); LogviewerLogDownloadHandler logDownloadHandler = new LogviewerLogDownloadHandler(logRoot, daemonLogRoot, - resourceAuthorizer); + workerLogs, resourceAuthorizer); LogviewerLogSearchHandler logSearchHandler = new LogviewerLogSearchHandler(stormConf, logRoot, daemonLogRoot, resourceAuthorizer); IHttpCredentialsPlugin httpCredsHandler = AuthUtils.GetUiHttpCredentialsPlugin(stormConf); http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java index 4b76abd..2c05972 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.storm.daemon.logviewer.handler; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.utils.Utils; import org.assertj.core.util.Lists; import org.junit.Test; @@ -31,6 +32,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.attribute.FileAttribute; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -56,9 +58,9 @@ public class LogviewerLogPageHandlerTest { file3.createNewFile(); String origin = "www.origin.server.net"; - + Map<String, Object> stormConf = Utils.readStormConfig(); LogviewerLogPageHandler handler = new LogviewerLogPageHandler(rootPath, null, - new ResourceAuthorizer(Utils.readStormConfig())); + new WorkerLogs(stormConf, new File(rootPath)), new ResourceAuthorizer(stormConf)); Response expectedAll = LogviewerResponseBuilder.buildSuccessJsonResponse( Lists.newArrayList("topoA/port1/worker.log", "topoA/port2/worker.log", "topoB/port1/worker.log"), http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java index 0c7d69c..21f4e9e 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java @@ -80,10 +80,12 @@ public class LogCleanerTest { conf.put(LOGVIEWER_CLEANUP_AGE_MINS, 60); conf.put(LOGVIEWER_CLEANUP_INTERVAL_SECS, 300); - LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner); + WorkerLogs workerLogs = new WorkerLogs(conf, null); + + LogCleaner logCleaner = new LogCleaner(conf, workerLogs, mockDirectoryCleaner, null); long nowMillis = Time.currentTimeMillis(); - long cutoffMillis = new LogCleaner(conf, mockDirectoryCleaner).cleanupCutoffAgeMillis(nowMillis); + long cutoffMillis = logCleaner.cleanupCutoffAgeMillis(nowMillis); long oldMtimeMillis = cutoffMillis - 500; long newMtimeMillis = cutoffMillis + 500; @@ -125,9 +127,6 @@ public class LogCleanerTest { when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(String.class))) .thenCallRealMethod(); - Map<String, Object> conf = Utils.readStormConfig(); - LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner); - long nowMillis = Time.currentTimeMillis(); List<File> files1 = Seq.range(0, 10).map(idx -> new MockFileBuilder().setFileName("A" + idx) @@ -157,7 +156,11 @@ public class LogCleanerTest { File rootDir = new MockDirectoryBuilder().setDirName("/workers-artifacts") .setFiles(rootFiles).build(); - List<Integer> deletedFiles = logCleaner.perWorkerDirCleanup(rootDir, 1200, mockDirectoryCleaner); + Map<String, Object> conf = Utils.readStormConfig(); + WorkerLogs workerLogs = new WorkerLogs(conf, rootDir); + LogCleaner logCleaner = new LogCleaner(conf, workerLogs, mockDirectoryCleaner, rootDir); + + List<Integer> deletedFiles = logCleaner.perWorkerDirCleanup(1200); assertEquals(Integer.valueOf(4), deletedFiles.get(0)); assertEquals(Integer.valueOf(4), deletedFiles.get(1)); assertEquals(Integer.valueOf(4), deletedFiles.get(deletedFiles.size() - 1)); @@ -182,15 +185,6 @@ public class LogCleanerTest { when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(String.class))) .thenCallRealMethod(); - Map<String, Object> conf = Utils.readStormConfig(); - - LogCleaner logCleaner = new LogCleaner(conf, mockDirectoryCleaner) { - @Override - SortedSet<String> getAliveWorkerDirs(File rootDir) throws Exception { - return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1")); - } - }; - long nowMillis = Time.currentTimeMillis(); List<File> files1 = Seq.range(0, 10).map(idx -> new MockFileBuilder().setFileName("A" + idx + ".log") @@ -222,45 +216,19 @@ public class LogCleanerTest { File rootDir = new MockDirectoryBuilder().setDirName("/workers-artifacts") .setFiles(rootFiles).build(); - int deletedFiles = logCleaner.globalLogCleanup(rootDir, 2400, mockDirectoryCleaner); - assertEquals(18, deletedFiles); - } finally { - Utils.setInstance(prevUtils); - } - } - - /** - * Build up workerid-workerlogdir map for the old workers' dirs. - */ - @Test - public void testIdentifyWorkerLogDirs() throws Exception { - File port1Dir = new MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1").build(); - File mockMetaFile = new MockFileBuilder().setFileName("worker.yaml").build(); - - String expId = "id12345"; - Map<String, File> expected = Collections.singletonMap(expId, port1Dir); - - try { - SupervisorUtils mockedSupervisorUtils = mock(SupervisorUtils.class); - SupervisorUtils.setInstance(mockedSupervisorUtils); - - Map<String, Object> stormConf = Utils.readStormConfig(); - LogCleaner logCleaner = new LogCleaner(stormConf, new DirectoryCleaner()) { - @Override - Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { - return Optional.of(mockMetaFile); - } - + Map<String, Object> conf = Utils.readStormConfig(); + WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir) { @Override - String getWorkerIdFromMetadataFile(String metaFile) { - return expId; + public SortedSet<String> getAliveWorkerDirs() throws Exception { + return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1")); } }; - when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, Object.class))).thenReturn(null); - assertEquals(expected, logCleaner.identifyWorkerLogDirs(Collections.singleton(port1Dir))); + LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, mockDirectoryCleaner, rootDir); + int deletedFiles = logCleaner.globalLogCleanup(2400); + assertEquals(18, deletedFiles); } finally { - SupervisorUtils.resetInstance(); + Utils.setInstance(prevUtils); } } @@ -286,9 +254,10 @@ public class LogCleanerTest { SupervisorUtils mockedSupervisorUtils = mock(SupervisorUtils.class); SupervisorUtils.setInstance(mockedSupervisorUtils); - LogCleaner logCleaner = new LogCleaner(stormConf, new DirectoryCleaner()) { + Map<String, Object> conf = Utils.readStormConfig(); + WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null) { @Override - Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { + public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { Map<String, File> ret = new HashMap<>(); ret.put("42", unexpectedDir1); ret.put("007", expectedDir2); @@ -299,6 +268,8 @@ public class LogCleanerTest { } }; + LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, new DirectoryCleaner(), null); + when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, Object.class))).thenReturn(idToHb); assertEquals(Sets.newSet(expectedDir2, expectedDir3), logCleaner.getDeadWorkerDirs(nowSecs, logDirs)); } finally { @@ -326,9 +297,13 @@ public class LogCleanerTest { return null; }).when(mockUtils).forceDelete(anyString()); - LogCleaner logCleaner = new LogCleaner(Utils.readStormConfig(), new DirectoryCleaner()) { + + Map<String, Object> conf = Utils.readStormConfig(); + WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null); + + LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, new DirectoryCleaner(), null) { @Override - Set<File> selectDirsForCleanup(long nowMillis, String rootDir) { + Set<File> selectDirsForCleanup(long nowMillis) { return Collections.emptySet(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bb5c247f/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java new file mode 100644 index 0000000..b8b5d8c --- /dev/null +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.utils; + +import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder; +import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyMapOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WorkerLogsTest { + + /** + * Build up workerid-workerlogdir map for the old workers' dirs. + */ + @Test + public void testIdentifyWorkerLogDirs() throws Exception { + File port1Dir = new MockDirectoryBuilder().setDirName("/workers-artifacts/topo1/port1").build(); + File mockMetaFile = new MockFileBuilder().setFileName("worker.yaml").build(); + + String expId = "id12345"; + Map<String, File> expected = Collections.singletonMap(expId, port1Dir); + + try { + SupervisorUtils mockedSupervisorUtils = mock(SupervisorUtils.class); + SupervisorUtils.setInstance(mockedSupervisorUtils); + + Map<String, Object> stormConf = Utils.readStormConfig(); + WorkerLogs workerLogs = new WorkerLogs(stormConf, port1Dir) { + @Override + public Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { + return Optional.of(mockMetaFile); + } + + @Override + public String getWorkerIdFromMetadataFile(String metaFile) { + return expId; + } + }; + + when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, Object.class))).thenReturn(null); + assertEquals(expected, workerLogs.identifyWorkerLogDirs(Collections.singleton(port1Dir))); + } finally { + SupervisorUtils.resetInstance(); + } + } + +}
