Repository: storm Updated Branches: refs/heads/master e82052339 -> bfcbf2c8e
STORM-3245: Don't blow up if empty log dirs exist Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d012f6c3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d012f6c3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d012f6c3 Branch: refs/heads/master Commit: d012f6c323d29a48ded29ff2ef86d77970a5d080 Parents: dad7410 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Thu Oct 4 15:12:18 2018 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Thu Oct 4 15:12:18 2018 -0500 ---------------------------------------------------------------------- .../logviewer/utils/DirectoryCleaner.java | 6 +-- .../daemon/logviewer/utils/LogCleaner.java | 9 +--- .../daemon/logviewer/utils/WorkerLogs.java | 47 ++++++++++++-------- .../daemon/logviewer/utils/LogCleanerTest.java | 24 ++++++---- .../daemon/logviewer/utils/WorkerLogsTest.java | 7 ++- 5 files changed, 53 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java index 0b17c84..2509fc8 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java @@ -86,7 +86,7 @@ public class DirectoryCleaner { * @return number of files deleted */ public DeletionMeta deleteOldestWhileTooLarge(List<File> dirs, - long quota, boolean forPerDir, Set<String> activeDirs) throws IOException { + long quota, boolean forPerDir, Set<File> activeDirs) throws IOException { long totalSize = 0; for (File dir : dirs) { try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { @@ -171,12 +171,12 @@ public class DirectoryCleaner { return new DeletionMeta(deletedSize, deletedFiles); } - private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException { + private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<File> activeDirs, File dir, File file) throws IOException { if (forPerDir) { return ACTIVE_LOG_PATTERN.matcher(file.getName()).matches(); } else { // for global cleanup // for an active worker's dir, make sure for the last "/" - return activeDirs.contains(dir.getCanonicalPath()) ? ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() : + return activeDirs.contains(dir) ? ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() : META_LOG_PATTERN.matcher(file.getName()).matches(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java index 035fe3b..036d224 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java @@ -210,7 +210,7 @@ public class LogCleaner implements Runnable, Closeable { @VisibleForTesting DeletionMeta globalLogCleanup(long size) throws Exception { List<File> workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs()); - Set<String> aliveWorkerDirs = new HashSet<>(workerLogs.getAliveWorkerDirs()); + Set<File> aliveWorkerDirs = workerLogs.getAliveWorkerDirs(); return directoryCleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); } @@ -235,12 +235,7 @@ public class LogCleaner implements Runnable, Closeable { return new TreeSet<>(); } else { Set<String> aliveIds = workerLogs.getAliveIds(nowSecs); - Map<String, File> idToDir = workerLogs.identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> !aliveIds.contains(entry.getKey())) - .map(Map.Entry::getValue) - .collect(toCollection(TreeSet::new)); + return workerLogs.getLogDirs(logDirs, (wid) -> !aliveIds.contains(wid)); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index 8d9934c..3cd5921 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; @@ -48,7 +49,6 @@ import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.jooq.lambda.Unchecked; -import org.jooq.lambda.tuple.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory; * A class that knows about how to operate with worker log directory. */ public class WorkerLogs { - private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + private static final Logger LOG = LoggerFactory.getLogger(WorkerLogs.class); public static final String WORKER_YAML = "worker.yaml"; @@ -140,15 +140,10 @@ public class WorkerLogs { /** * Return a sorted set of java.io.Files that were written by workers that are now active. */ - public SortedSet<String> getAliveWorkerDirs() { + public SortedSet<File> getAliveWorkerDirs() { Set<String> aliveIds = getAliveIds(Time.currentTimeSecs()); Set<File> logDirs = getAllWorkerDirs(); - Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> aliveIds.contains(entry.getKey())) - .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) - .collect(toCollection(TreeSet::new)); + return getLogDirs(logDirs, (wid) -> aliveIds.contains(wid)); } /** @@ -172,7 +167,7 @@ public class WorkerLogs { */ public String getWorkerIdFromMetadataFile(String metaFile) { Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); - return ObjectReader.getString(map.get("worker-id"), null); + return ObjectReader.getString(map == null ? null : map.get("worker-id"), null); } /** @@ -199,19 +194,33 @@ public class WorkerLogs { } /** - * Finds a worker ID for each directory in set and return it as map. + * Finds directories for specific worker ids that can be cleaned up. * * @param logDirs directories to check whether they're worker directories or not - * @return the pair of worker ID, directory. worker ID will be an empty string if the directory is not a worker directory. + * @param predicate a check on a worker id to see if the log dir should be included + * @return directories that can be cleaned up. */ - public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { + public SortedSet<File> getLogDirs(Set<File> logDirs, Predicate<String> predicate) { // we could also make this static, but not to do it due to mock - return logDirs.stream().map(Unchecked.function(logDir -> { - Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); - - return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) - .orElse(new Tuple2<>("", logDir)); - })).collect(toMap(Tuple2::v1, Tuple2::v2)); + TreeSet<File> ret = new TreeSet<>(); + for (File logDir: logDirs) { + String workerId = ""; + try { + Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); + if (metaFile.isPresent()) { + workerId = getWorkerIdFromMetadataFile(metaFile.get().getCanonicalPath()); + if (workerId == null) { + workerId = ""; + } + } + } catch (IOException e) { + LOG.warn("Error trying to find worker.yaml in {}", logDir, e); + } + if (predicate.test(workerId)) { + ret.add(logDir); + } + } + return ret; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java index d85edea..92c5a97 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder; import org.apache.storm.daemon.logviewer.testsupport.MockRemovableFileBuilder; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -187,7 +188,7 @@ public class LogCleanerTest { List<Path> paths = Arrays.stream(file.listFiles()).map(f -> mkMockPath(f)).collect(toList()); return mkDirectoryStream(paths); }); - when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(String.class))) + when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(File.class))) .thenCallRealMethod(); long nowMillis = Time.currentTimeMillis(); @@ -225,8 +226,8 @@ public class LogCleanerTest { StormMetricsRegistry metricRegistry = new StormMetricsRegistry(); WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir, metricRegistry) { @Override - public SortedSet<String> getAliveWorkerDirs() { - return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1")); + public SortedSet<File> getAliveWorkerDirs() { + return new TreeSet<>(Collections.singletonList(new File("/workers-artifacts/topo1/port1"))); } }; @@ -264,12 +265,17 @@ public class LogCleanerTest { StormMetricsRegistry metricRegistry = new StormMetricsRegistry(); WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null, metricRegistry) { @Override - public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { - Map<String, File> ret = new HashMap<>(); - ret.put("42", unexpectedDir1); - ret.put("007", expectedDir2); - // this tests a directory with no yaml file thus no worker id - ret.put("", expectedDir3); + public SortedSet<File> getLogDirs(Set<File> logDirs, Predicate<String> predicate) { + TreeSet<File> ret = new TreeSet<>(); + if (predicate.test("42")) { + ret.add(unexpectedDir1); + } + if (predicate.test("007")) { + ret.add(expectedDir2); + } + if(predicate.test("")) { + ret.add(expectedDir3); + } return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java index c7ca3dd..3b3a72f 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java @@ -29,6 +29,8 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder; import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -47,7 +49,8 @@ public class WorkerLogsTest { File mockMetaFile = new MockFileBuilder().setFileName("worker.yaml").build(); String expId = "id12345"; - Map<String, File> expected = Collections.singletonMap(expId, port1Dir); + SortedSet<File> expected = new TreeSet<>(); + expected.add(port1Dir); try { SupervisorUtils mockedSupervisorUtils = mock(SupervisorUtils.class); @@ -67,7 +70,7 @@ public class WorkerLogsTest { }; when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, Object.class))).thenReturn(null); - assertEquals(expected, workerLogs.identifyWorkerLogDirs(Collections.singleton(port1Dir))); + assertEquals(expected, workerLogs.getLogDirs(Collections.singleton(port1Dir), (wid) -> true)); } finally { SupervisorUtils.resetInstance(); }