Repository: storm
Updated Branches:
  refs/heads/master e82052339 -> bfcbf2c8e


STORM-3245: Don't blow up if empty log dirs exist


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d012f6c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d012f6c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d012f6c3

Branch: refs/heads/master
Commit: d012f6c323d29a48ded29ff2ef86d77970a5d080
Parents: dad7410
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 4 15:12:18 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Oct 4 15:12:18 2018 -0500

----------------------------------------------------------------------
 .../logviewer/utils/DirectoryCleaner.java       |  6 +--
 .../daemon/logviewer/utils/LogCleaner.java      |  9 +---
 .../daemon/logviewer/utils/WorkerLogs.java      | 47 ++++++++++++--------
 .../daemon/logviewer/utils/LogCleanerTest.java  | 24 ++++++----
 .../daemon/logviewer/utils/WorkerLogsTest.java  |  7 ++-
 5 files changed, 53 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
index 0b17c84..2509fc8 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
@@ -86,7 +86,7 @@ public class DirectoryCleaner {
      * @return number of files deleted
      */
     public DeletionMeta deleteOldestWhileTooLarge(List<File> dirs,
-                                                  long quota, boolean 
forPerDir, Set<String> activeDirs) throws IOException {
+                                                  long quota, boolean 
forPerDir, Set<File> activeDirs) throws IOException {
         long totalSize = 0;
         for (File dir : dirs) {
             try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
@@ -171,12 +171,12 @@ public class DirectoryCleaner {
         return new DeletionMeta(deletedSize, deletedFiles);
     }
 
-    private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> 
activeDirs, File dir, File file) throws IOException {
+    private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<File> 
activeDirs, File dir, File file) throws IOException {
         if (forPerDir) {
             return ACTIVE_LOG_PATTERN.matcher(file.getName()).matches();
         } else { // for global cleanup
             // for an active worker's dir, make sure for the last "/"
-            return activeDirs.contains(dir.getCanonicalPath()) ? 
ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() :
+            return activeDirs.contains(dir) ? 
ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() :
                 META_LOG_PATTERN.matcher(file.getName()).matches();
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
index 035fe3b..036d224 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
@@ -210,7 +210,7 @@ public class LogCleaner implements Runnable, Closeable {
     @VisibleForTesting
     DeletionMeta globalLogCleanup(long size) throws Exception {
         List<File> workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs());
-        Set<String> aliveWorkerDirs = new 
HashSet<>(workerLogs.getAliveWorkerDirs());
+        Set<File> aliveWorkerDirs = workerLogs.getAliveWorkerDirs();
 
         return directoryCleaner.deleteOldestWhileTooLarge(workerDirs, size, 
false, aliveWorkerDirs);
     }
@@ -235,12 +235,7 @@ public class LogCleaner implements Runnable, Closeable {
             return new TreeSet<>();
         } else {
             Set<String> aliveIds = workerLogs.getAliveIds(nowSecs);
-            Map<String, File> idToDir = 
workerLogs.identifyWorkerLogDirs(logDirs);
-
-            return idToDir.entrySet().stream()
-                    .filter(entry -> !aliveIds.contains(entry.getKey()))
-                    .map(Map.Entry::getValue)
-                    .collect(toCollection(TreeSet::new));
+            return workerLogs.getLogDirs(logDirs, (wid) -> 
!aliveIds.contains(wid));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
index 8d9934c..3cd5921 100644
--- 
a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
+++ 
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
@@ -38,6 +38,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
@@ -48,7 +49,6 @@ import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.jooq.lambda.Unchecked;
-import org.jooq.lambda.tuple.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
  * A class that knows about how to operate with worker log directory.
  */
 public class WorkerLogs {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(WorkerLogs.class);
 
     public static final String WORKER_YAML = "worker.yaml";
     
@@ -140,15 +140,10 @@ public class WorkerLogs {
     /**
      * Return a sorted set of java.io.Files that were written by workers that 
are now active.
      */
-    public SortedSet<String> getAliveWorkerDirs() {
+    public SortedSet<File> getAliveWorkerDirs() {
         Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
         Set<File> logDirs = getAllWorkerDirs();
-        Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
-
-        return idToDir.entrySet().stream()
-                .filter(entry -> aliveIds.contains(entry.getKey()))
-                .map(Unchecked.function(entry -> 
entry.getValue().getCanonicalPath()))
-                .collect(toCollection(TreeSet::new));
+        return getLogDirs(logDirs, (wid) -> aliveIds.contains(wid));
     }
 
     /**
@@ -172,7 +167,7 @@ public class WorkerLogs {
      */
     public String getWorkerIdFromMetadataFile(String metaFile) {
         Map<String, Object> map = (Map<String, Object>) 
Utils.readYamlFile(metaFile);
-        return ObjectReader.getString(map.get("worker-id"), null);
+        return ObjectReader.getString(map == null ? null : 
map.get("worker-id"), null);
     }
 
     /**
@@ -199,19 +194,33 @@ public class WorkerLogs {
     }
 
     /**
-     * Finds a worker ID for each directory in set and return it as map.
+     * Finds directories for specific worker ids that can be cleaned up.
      *
      * @param logDirs directories to check whether they're worker directories 
or not
-     * @return the pair of worker ID, directory. worker ID will be an empty 
string if the directory is not a worker directory.
+     * @param predicate a check on a worker id to see if the log dir should be 
included
+     * @return directories that can be cleaned up.
      */
-    public Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) {
+    public SortedSet<File> getLogDirs(Set<File> logDirs, Predicate<String> 
predicate) {
         // we could also make this static, but not to do it due to mock
-        return logDirs.stream().map(Unchecked.function(logDir -> {
-            Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir);
-
-            return metaFile.map(Unchecked.function(m -> new 
Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir)))
-                    .orElse(new Tuple2<>("", logDir));
-        })).collect(toMap(Tuple2::v1, Tuple2::v2));
+        TreeSet<File> ret = new TreeSet<>();
+        for (File logDir: logDirs) {
+            String workerId = "";
+            try {
+                Optional<File> metaFile = 
getMetadataFileForWorkerLogDir(logDir);
+                if (metaFile.isPresent()) {
+                    workerId = 
getWorkerIdFromMetadataFile(metaFile.get().getCanonicalPath());
+                    if (workerId == null) {
+                        workerId = "";
+                    }
+                }
+            } catch (IOException e) {
+                LOG.warn("Error trying to find worker.yaml in {}", logDir, e);
+            }
+            if (predicate.test(workerId)) {
+                ret.add(logDir);
+            }
+        }
+        return ret;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
index d85edea..92c5a97 100644
--- 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import java.util.function.Predicate;
 import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder;
 import org.apache.storm.daemon.logviewer.testsupport.MockRemovableFileBuilder;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
@@ -187,7 +188,7 @@ public class LogCleanerTest {
                 List<Path> paths = Arrays.stream(file.listFiles()).map(f -> 
mkMockPath(f)).collect(toList());
                 return mkDirectoryStream(paths);
             });
-            
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(String.class)))
+            
when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), 
anyLong(), anyBoolean(), anySetOf(File.class)))
                     .thenCallRealMethod();
 
             long nowMillis = Time.currentTimeMillis();
@@ -225,8 +226,8 @@ public class LogCleanerTest {
             StormMetricsRegistry metricRegistry = new StormMetricsRegistry();
             WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir, 
metricRegistry) {
                 @Override
-                public SortedSet<String> getAliveWorkerDirs() {
-                    return new 
TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1"));
+                public SortedSet<File> getAliveWorkerDirs() {
+                    return new TreeSet<>(Collections.singletonList(new 
File("/workers-artifacts/topo1/port1")));
                 }
             };
 
@@ -264,12 +265,17 @@ public class LogCleanerTest {
             StormMetricsRegistry metricRegistry = new StormMetricsRegistry();
             WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null, 
metricRegistry) {
                 @Override
-                public Map<String, File> identifyWorkerLogDirs(Set<File> 
logDirs) {
-                    Map<String, File> ret = new HashMap<>();
-                    ret.put("42", unexpectedDir1);
-                    ret.put("007", expectedDir2);
-                    // this tests a directory with no yaml file thus no worker 
id
-                    ret.put("", expectedDir3);
+                public SortedSet<File> getLogDirs(Set<File> logDirs, 
Predicate<String> predicate) {
+                    TreeSet<File> ret = new TreeSet<>();
+                    if (predicate.test("42")) {
+                        ret.add(unexpectedDir1);
+                    }
+                    if (predicate.test("007")) {
+                        ret.add(expectedDir2);
+                    }
+                    if(predicate.test("")) {
+                        ret.add(expectedDir3);
+                    }
 
                     return ret;
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d012f6c3/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
----------------------------------------------------------------------
diff --git 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
index c7ca3dd..3b3a72f 100644
--- 
a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
+++ 
b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java
@@ -29,6 +29,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
+import java.util.SortedSet;
+import java.util.TreeSet;
 import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder;
 import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
@@ -47,7 +49,8 @@ public class WorkerLogsTest {
         File mockMetaFile = new 
MockFileBuilder().setFileName("worker.yaml").build();
 
         String expId = "id12345";
-        Map<String, File> expected = Collections.singletonMap(expId, port1Dir);
+        SortedSet<File> expected = new TreeSet<>();
+        expected.add(port1Dir);
 
         try {
             SupervisorUtils mockedSupervisorUtils = 
mock(SupervisorUtils.class);
@@ -67,7 +70,7 @@ public class WorkerLogsTest {
             };
 
             
when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, 
Object.class))).thenReturn(null);
-            assertEquals(expected, 
workerLogs.identifyWorkerLogDirs(Collections.singleton(port1Dir)));
+            assertEquals(expected, 
workerLogs.getLogDirs(Collections.singleton(port1Dir), (wid) -> true));
         } finally {
             SupervisorUtils.resetInstance();
         }

Reply via email to