STORM-307: reset LocalState if files are corrupt * allow supervisor to start if LocalState was corrupted
Change-Id: I3dbf957dd2e37c9c6be39dab29a9a71e99b7a2c3 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2ecd44a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2ecd44a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2ecd44a Branch: refs/heads/security Commit: b2ecd44a7ee13d6c98171c63d056bdef481d6e0f Parents: 1babd83 Author: Wurstmeister <[email protected]> Authored: Sun Oct 5 21:46:24 2014 +0100 Committer: Wurstmeister <[email protected]> Committed: Sun Oct 5 21:46:24 2014 +0100 ---------------------------------------------------------------------- .../jvm/backtype/storm/utils/LocalState.java | 28 ++++++++++++++------ .../clj/backtype/storm/local_state_test.clj | 14 +++++++++- 2 files changed, 33 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b2ecd44a/storm-core/src/jvm/backtype/storm/utils/LocalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index 0d0ae07..f58e79c 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -18,6 +18,8 @@ package backtype.storm.utils; import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.Map; @@ -30,25 +32,35 @@ import java.io.IOException; * Every read/write hits disk. */ public class LocalState { + public static Logger LOG = LoggerFactory.getLogger(LocalState.class); + private VersionedStore _vs; public LocalState(String backingDir) throws IOException { _vs = new VersionedStore(backingDir); } - + public synchronized Map<Object, Object> snapshot() throws IOException { int attempts = 0; + Map<Object, Object> result = new HashMap<Object, Object>(); while(true) { String latestPath = _vs.mostRecentVersionPath(); - if(latestPath==null) return new HashMap<Object, Object>(); - try { - return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath))); - } catch(IOException e) { - attempts++; - if(attempts >= 10) { - throw e; + if(latestPath != null) { + try { + byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath)); + if (serialized.length == 0) { + LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath); + } else { + result = (Map<Object, Object>) Utils.deserialize(serialized); + } + } catch (IOException e) { + attempts++; + if (attempts >= 10) { + throw e; + } } } + return result; } } http://git-wip-us.apache.org/repos/asf/storm/blob/b2ecd44a/storm-core/test/clj/backtype/storm/local_state_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj index ba2b969..4bd58ec 100644 --- a/storm-core/test/clj/backtype/storm/local_state_test.clj +++ b/storm-core/test/clj/backtype/storm/local_state_test.clj @@ -16,7 +16,9 @@ (ns backtype.storm.local-state-test (:use [clojure test]) (:use [backtype.storm testing]) - (:import [backtype.storm.utils LocalState])) + (:import [backtype.storm.utils LocalState] + [org.apache.commons.io FileUtils] + [java.io File])) (deftest test-local-state (with-local-tmp [dir1 dir2] @@ -41,3 +43,13 @@ (.put ls2 "b" 8) (is (= 8 (.get ls2 "b"))) ))) + +(deftest empty-state + (with-local-tmp [dir] + (let [ls (LocalState. dir) + data (FileUtils/openOutputStream (File. dir "12345")) + version (FileUtils/openOutputStream (File. dir "12345.version"))] + (is (= nil (.get ls "c"))) + (.put ls "a" 1) + (is (= 1 (.get ls "a"))) + )))
