Repository: flink
Updated Branches:
  refs/heads/master d73cb7369 -> 83102f0ea


[FLINK-2436] [streaming] Make ByteStreamStateHandles more robust

Closes #958


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83102f0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83102f0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83102f0e

Branch: refs/heads/master
Commit: 83102f0ea052e7f7c43c9ba6aaff0dc1c24791c9
Parents: d73cb73
Author: Gyula Fora <[email protected]>
Authored: Thu Jul 30 14:07:42 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sun Aug 2 21:14:18 2015 +0200

----------------------------------------------------------------------
 .../runtime/state/ByteStreamStateHandle.java    |  33 ++++-
 .../state/ByteStreamStateHandleTest.java        | 125 +++++++++++++++++++
 .../flink/tachyon/FileStateHandleTest.java      |  15 ++-
 3 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index 257784a..bf2dca8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -35,9 +35,14 @@ public abstract class ByteStreamStateHandle implements 
StateHandle<Serializable>
        private static final long serialVersionUID = -962025800339325828L;
 
        private transient Serializable state;
+       private boolean isWritten = false;
 
        public ByteStreamStateHandle(Serializable state) {
-               this.state = state;
+               if (state != null) {
+                       this.state = state;
+               } else {
+                       throw new RuntimeException("State cannot be null");
+               }
        }
 
        /**
@@ -54,16 +59,25 @@ public abstract class ByteStreamStateHandle implements 
StateHandle<Serializable>
        public Serializable getState() throws Exception {
                if (!stateFetched()) {
                        ObjectInputStream stream = new 
ObjectInputStream(getInputStream());
-                       state = (Serializable) stream.readObject();
-                       stream.close();
+                       try {
+                               state = (Serializable) stream.readObject();
+                       } finally {
+                               stream.close();
+                       }
                }
                return state;
        }
 
        private void writeObject(ObjectOutputStream oos) throws Exception {
-               ObjectOutputStream stream = new 
ObjectOutputStream(getOutputStream());
-               stream.writeObject(state);
-               stream.close();
+               if (!isWritten) {
+                       ObjectOutputStream stream = new 
ObjectOutputStream(getOutputStream());
+                       try {
+                               stream.writeObject(state);
+                               isWritten = true;
+                       } finally {
+                               stream.close();
+                       }
+               }
                oos.defaultWriteObject();
        }
 
@@ -74,4 +88,11 @@ public abstract class ByteStreamStateHandle implements 
StateHandle<Serializable>
        public boolean stateFetched() {
                return state != null;
        }
+       
+       /**
+        * Checks whether the state has already been written to the external 
store
+        */
+       public boolean isWritten() {
+               return isWritten;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
new file mode 100644
index 0000000..a7378b9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+public class ByteStreamStateHandleTest {
+
+       @Test
+       public void testHandle() throws Exception {
+               MockHandle handle;
+
+               try {
+                       handle = new MockHandle(null);
+                       fail();
+               } catch (RuntimeException e) {
+                       // expected behaviour
+               }
+
+               handle = new MockHandle(1);
+
+               assertEquals(1, handle.getState());
+               assertTrue(handle.stateFetched());
+               assertFalse(handle.isWritten());
+               assertFalse(handle.discarded);
+
+               MockHandle handleDs = serializeDeserialize(handle);
+
+               assertEquals(1, handle.getState());
+               assertTrue(handle.stateFetched());
+               assertTrue(handle.isWritten());
+               assertTrue(handle.generatedOutput);
+               assertFalse(handle.discarded);
+
+               assertFalse(handleDs.stateFetched());
+               assertTrue(handleDs.isWritten());
+               assertFalse(handleDs.generatedOutput);
+               assertFalse(handle.discarded);
+
+               try {
+                       handleDs.getState();
+                       fail();
+               } catch (UnsupportedOperationException e) {
+                       // good
+               }
+
+               MockHandle handleDs2 = serializeDeserialize(handleDs);
+
+               assertFalse(handleDs2.stateFetched());
+               assertTrue(handleDs2.isWritten());
+               assertFalse(handleDs.generatedOutput);
+               assertFalse(handleDs2.generatedOutput);
+               assertFalse(handleDs2.discarded);
+
+               handleDs2.discardState();
+               assertTrue(handleDs2.discarded);
+
+       }
+
+       @SuppressWarnings("unchecked")
+       private <X extends StateHandle<?>> X serializeDeserialize(X handle) 
throws IOException,
+                       ClassNotFoundException {
+               byte[] serialized = InstantiationUtil.serializeObject(handle);
+               return (X) InstantiationUtil.deserializeObject(serialized, 
Thread.currentThread()
+                               .getContextClassLoader());
+       }
+
+       private static class MockHandle extends ByteStreamStateHandle {
+
+               private static final long serialVersionUID = 1L;
+
+               public MockHandle(Serializable state) {
+                       super(state);
+               }
+
+               boolean discarded = false;
+               transient boolean generatedOutput = false;
+
+               @Override
+               public void discardState() throws Exception {
+                       discarded = true;
+               }
+
+               @Override
+               protected OutputStream getOutputStream() throws Exception {
+                       generatedOutput = true;
+                       return new ByteArrayOutputStream();
+               }
+
+               @Override
+               protected InputStream getInputStream() throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
index 82b5d35..2873c78 100644
--- 
a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.tachyon;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -90,13 +91,23 @@ public class FileStateHandleTest {
                                + hdPath);
 
                FileStateHandle handle = (FileStateHandle) 
handleProvider.createStateHandle(state);
+               
+               try {
+                       handleProvider.createStateHandle(null);
+                       fail();
+               } catch (RuntimeException e) {
+                       // good
+               }
 
                assertTrue(handle.stateFetched());
+               assertFalse(handle.isWritten());
 
                // Serialize the handle so it writes the value to hdfs
                SerializedValue<StateHandle<Serializable>> serializedHandle = 
new SerializedValue<StateHandle<Serializable>>(
                                handle);
-
+               
+               assertTrue(handle.isWritten());
+               
                // Deserialize the handle and verify that the state is not 
fetched yet
                FileStateHandle deserializedHandle = (FileStateHandle) 
serializedHandle
                                
.deserializeValue(Thread.currentThread().getContextClassLoader());
@@ -107,7 +118,7 @@ public class FileStateHandleTest {
 
                // Test whether discard removes the checkpoint file properly
                assertTrue(hdfs.listFiles(hdPath, true).hasNext());
-               handle.discardState();
+               deserializedHandle.discardState();
                assertFalse(hdfs.listFiles(hdPath, true).hasNext());
 
        }

Reply via email to