STORM-307: reset LocalState if files are corrupt

* allow supervisor to start if LocalState was corrupted

Change-Id: I3dbf957dd2e37c9c6be39dab29a9a71e99b7a2c3


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2ecd44a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2ecd44a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2ecd44a

Branch: refs/heads/security
Commit: b2ecd44a7ee13d6c98171c63d056bdef481d6e0f
Parents: 1babd83
Author: Wurstmeister <[email protected]>
Authored: Sun Oct 5 21:46:24 2014 +0100
Committer: Wurstmeister <[email protected]>
Committed: Sun Oct 5 21:46:24 2014 +0100

----------------------------------------------------------------------
 .../jvm/backtype/storm/utils/LocalState.java    | 28 ++++++++++++++------
 .../clj/backtype/storm/local_state_test.clj     | 14 +++++++++-
 2 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b2ecd44a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java 
b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 0d0ae07..f58e79c 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -18,6 +18,8 @@
 package backtype.storm.utils;
 
 import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
@@ -30,25 +32,35 @@ import java.io.IOException;
  * Every read/write hits disk.
  */
 public class LocalState {
+    public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
+
     private VersionedStore _vs;
     
     public LocalState(String backingDir) throws IOException {
         _vs = new VersionedStore(backingDir);
     }
-    
+
     public synchronized Map<Object, Object> snapshot() throws IOException {
         int attempts = 0;
+        Map<Object, Object> result = new HashMap<Object, Object>();
         while(true) {
             String latestPath = _vs.mostRecentVersionPath();
-            if(latestPath==null) return new HashMap<Object, Object>();
-            try {
-                return (Map<Object, Object>) 
Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
-            } catch(IOException e) {
-                attempts++;
-                if(attempts >= 10) {
-                    throw e;
+            if(latestPath != null) {
+                try {
+                    byte[] serialized = FileUtils.readFileToByteArray(new 
File(latestPath));
+                    if (serialized.length == 0) {
+                        LOG.warn("LocalState file '{}' contained no data, 
resetting state", latestPath);
+                    } else {
+                        result = (Map<Object, Object>) 
Utils.deserialize(serialized);
+                    }
+                } catch (IOException e) {
+                    attempts++;
+                    if (attempts >= 10) {
+                        throw e;
+                    }
                 }
             }
+            return result;
         }
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/b2ecd44a/storm-core/test/clj/backtype/storm/local_state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj 
b/storm-core/test/clj/backtype/storm/local_state_test.clj
index ba2b969..4bd58ec 100644
--- a/storm-core/test/clj/backtype/storm/local_state_test.clj
+++ b/storm-core/test/clj/backtype/storm/local_state_test.clj
@@ -16,7 +16,9 @@
 (ns backtype.storm.local-state-test
   (:use [clojure test])
   (:use [backtype.storm testing])
-  (:import [backtype.storm.utils LocalState]))
+  (:import [backtype.storm.utils LocalState]
+           [org.apache.commons.io FileUtils]
+           [java.io File]))
 
 (deftest test-local-state
   (with-local-tmp [dir1 dir2]
@@ -41,3 +43,13 @@
       (.put ls2 "b" 8)
       (is (= 8 (.get ls2 "b")))
       )))
+
+(deftest empty-state
+  (with-local-tmp [dir]
+    (let [ls (LocalState. dir)
+          data (FileUtils/openOutputStream (File. dir "12345"))
+          version (FileUtils/openOutputStream (File. dir "12345.version"))]
+      (is (= nil (.get ls "c")))
+      (.put ls "a" 1)
+      (is (= 1 (.get ls "a")))
+  )))

Reply via email to