STORM-1670 version store should consider ony active/finished version files
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c050020 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c050020 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c050020 Branch: refs/heads/master Commit: 4c0500206509b205769b8a1a17a17f77d7772fa7 Parents: 69ec8ff Author: Satish Duggana <[email protected]> Authored: Fri Apr 1 16:29:53 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sat Apr 2 08:00:44 2016 +0530 ---------------------------------------------------------------------- .../org/apache/storm/utils/VersionedStore.java | 52 ++++++++++++-------- 1 file changed, 32 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4c050020/storm-core/src/jvm/org/apache/storm/utils/VersionedStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/VersionedStore.java b/storm-core/src/jvm/org/apache/storm/utils/VersionedStore.java index bbaf0f3..49c736f 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/VersionedStore.java +++ b/storm-core/src/jvm/org/apache/storm/utils/VersionedStore.java @@ -18,9 +18,13 @@ package org.apache.storm.utils; import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.io.File; @@ -121,10 +125,12 @@ public class VersionedStore { } HashSet<Long> keepers = new HashSet<Long>(versions); - for(String p: listDir(_root)) { - Long v = parseVersion(p); - if(v!=null && !keepers.contains(v)) { - deleteVersion(v); + try(DirectoryStream<Path> directoryStream = Files.newDirectoryStream(new File(_root).toPath())) { + for (Path path : directoryStream) { + Long v = parseVersion(path.toAbsolutePath().toString()); + if (v != null && !keepers.contains(v)) { + deleteVersion(v); + } } } } @@ -133,15 +139,18 @@ public class VersionedStore { * Sorted from most recent to oldest */ public List<Long> getAllVersions() throws IOException { - List<Long> ret = new ArrayList<Long>(); - for(String s: listDir(_root)) { - if(s.endsWith(FINISHED_VERSION_SUFFIX)) { - ret.add(validateAndGetVersion(s)); + List<Long> versions = new ArrayList<Long>(); + try(DirectoryStream<Path> pathDirectoryStream = listDirWithFinishedFiles(_root)) { + for (Path path : pathDirectoryStream) { + String absolutePath = path.toAbsolutePath().toString(); + if (absolutePath.endsWith(FINISHED_VERSION_SUFFIX)) { + versions.add(validateAndGetVersion(absolutePath)); + } } } - Collections.sort(ret); - Collections.reverse(ret); - return ret; + Collections.sort(versions); + Collections.reverse(versions); + return versions; } private String tokenPath(long version) { @@ -173,15 +182,18 @@ public class VersionedStore { private void mkdirs(String path) throws IOException { new File(path).mkdirs(); } - - private List<String> listDir(String dir) throws IOException { - List<String> ret = new ArrayList<String>(); - File[] contents = new File(dir).listFiles(); - if(contents!=null) { - for(File f: contents) { - ret.add(f.getAbsolutePath()); + + /** + * Return files which have both original and finished versions. + */ + private DirectoryStream<Path> listDirWithFinishedFiles(String dir) throws IOException { + return Files.newDirectoryStream(new File(dir).toPath(), new DirectoryStream.Filter<Path>() { + @Override + public boolean accept(Path path) throws IOException { + final String filePath = path.toAbsolutePath().toString(); + return filePath.endsWith(FINISHED_VERSION_SUFFIX) && + new File(filePath.substring(0, filePath.length() - FINISHED_VERSION_SUFFIX.length())).exists(); } - } - return ret; + }); } }
