Repository: flink Updated Branches: refs/heads/master d73cb7369 -> 83102f0ea
[FLINK-2436] [streaming] Make ByteStreamStateHandles more robust Closes #958 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83102f0e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83102f0e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83102f0e Branch: refs/heads/master Commit: 83102f0ea052e7f7c43c9ba6aaff0dc1c24791c9 Parents: d73cb73 Author: Gyula Fora <[email protected]> Authored: Thu Jul 30 14:07:42 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Sun Aug 2 21:14:18 2015 +0200 ---------------------------------------------------------------------- .../runtime/state/ByteStreamStateHandle.java | 33 ++++- .../state/ByteStreamStateHandleTest.java | 125 +++++++++++++++++++ .../flink/tachyon/FileStateHandleTest.java | 15 ++- 3 files changed, 165 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java index 257784a..bf2dca8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java @@ -35,9 +35,14 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable> private static final long serialVersionUID = -962025800339325828L; private transient Serializable state; + private boolean isWritten = false; public ByteStreamStateHandle(Serializable state) { - this.state = state; + if (state != null) { + this.state = state; + } else { + throw new RuntimeException("State cannot be null"); + } } /** @@ -54,16 +59,25 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable> public Serializable getState() throws Exception { if (!stateFetched()) { ObjectInputStream stream = new ObjectInputStream(getInputStream()); - state = (Serializable) stream.readObject(); - stream.close(); + try { + state = (Serializable) stream.readObject(); + } finally { + stream.close(); + } } return state; } private void writeObject(ObjectOutputStream oos) throws Exception { - ObjectOutputStream stream = new ObjectOutputStream(getOutputStream()); - stream.writeObject(state); - stream.close(); + if (!isWritten) { + ObjectOutputStream stream = new ObjectOutputStream(getOutputStream()); + try { + stream.writeObject(state); + isWritten = true; + } finally { + stream.close(); + } + } oos.defaultWriteObject(); } @@ -74,4 +88,11 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable> public boolean stateFetched() { return state != null; } + + /** + * Checks whether the state has already been written to the external store + */ + public boolean isWritten() { + return isWritten; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java new file mode 100644 index 0000000..a7378b9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.flink.util.InstantiationUtil; +import org.junit.Test; + +public class ByteStreamStateHandleTest { + + @Test + public void testHandle() throws Exception { + MockHandle handle; + + try { + handle = new MockHandle(null); + fail(); + } catch (RuntimeException e) { + // expected behaviour + } + + handle = new MockHandle(1); + + assertEquals(1, handle.getState()); + assertTrue(handle.stateFetched()); + assertFalse(handle.isWritten()); + assertFalse(handle.discarded); + + MockHandle handleDs = serializeDeserialize(handle); + + assertEquals(1, handle.getState()); + assertTrue(handle.stateFetched()); + assertTrue(handle.isWritten()); + assertTrue(handle.generatedOutput); + assertFalse(handle.discarded); + + assertFalse(handleDs.stateFetched()); + assertTrue(handleDs.isWritten()); + assertFalse(handleDs.generatedOutput); + assertFalse(handle.discarded); + + try { + handleDs.getState(); + fail(); + } catch (UnsupportedOperationException e) { + // good + } + + MockHandle handleDs2 = serializeDeserialize(handleDs); + + assertFalse(handleDs2.stateFetched()); + assertTrue(handleDs2.isWritten()); + assertFalse(handleDs.generatedOutput); + assertFalse(handleDs2.generatedOutput); + assertFalse(handleDs2.discarded); + + handleDs2.discardState(); + assertTrue(handleDs2.discarded); + + } + + @SuppressWarnings("unchecked") + private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException, + ClassNotFoundException { + byte[] serialized = InstantiationUtil.serializeObject(handle); + return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread() + .getContextClassLoader()); + } + + private static class MockHandle extends ByteStreamStateHandle { + + private static final long serialVersionUID = 1L; + + public MockHandle(Serializable state) { + super(state); + } + + boolean discarded = false; + transient boolean generatedOutput = false; + + @Override + public void discardState() throws Exception { + discarded = true; + } + + @Override + protected OutputStream getOutputStream() throws Exception { + generatedOutput = true; + return new ByteArrayOutputStream(); + } + + @Override + protected InputStream getInputStream() throws Exception { + throw new UnsupportedOperationException(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java index 82b5d35..2873c78 100644 --- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java +++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java @@ -21,6 +21,7 @@ package org.apache.flink.tachyon; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -90,13 +91,23 @@ public class FileStateHandleTest { + hdPath); FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state); + + try { + handleProvider.createStateHandle(null); + fail(); + } catch (RuntimeException e) { + // good + } assertTrue(handle.stateFetched()); + assertFalse(handle.isWritten()); // Serialize the handle so it writes the value to hdfs SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>( handle); - + + assertTrue(handle.isWritten()); + // Deserialize the handle and verify that the state is not fetched yet FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle .deserializeValue(Thread.currentThread().getContextClassLoader()); @@ -107,7 +118,7 @@ public class FileStateHandleTest { // Test whether discard removes the checkpoint file properly assertTrue(hdfs.listFiles(hdPath, true).hasNext()); - handle.discardState(); + deserializedHandle.discardState(); assertFalse(hdfs.listFiles(hdPath, true).hasNext()); }
