[FLINK-2888] [streaming] State backends return copies of the default values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6424ce57 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6424ce57 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6424ce57 Branch: refs/heads/master Commit: 6424ce57b4012af64adc88ef37f341d6a5aec744 Parents: 2d64715 Author: Stephan Ewen <[email protected]> Authored: Thu Oct 22 11:24:12 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Sun Oct 25 19:13:11 2015 +0100 ---------------------------------------------------------------------- .../runtime/state/AbstractHeapKvState.java | 3 +- .../runtime/state/FileStateBackendTest.java | 40 ++++++++++++++++---- .../runtime/state/MemoryStateBackendTest.java | 31 ++++++++++++--- 3 files changed, 61 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java index 12250b9..23703b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java @@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Bac @Override public V value() { V value = state.get(currentKey); - return value != null ? value : defaultValue; + return value != null ? value : + (defaultValue == null ? null : valueSerializer.copy(defaultValue)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 481fb98..7182a36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.IntValueSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.types.IntValue; import org.apache.flink.types.StringValue; -import org.apache.flink.util.OperatingSystem; import org.junit.Test; @@ -382,6 +378,36 @@ public class FileStateBackendTest { deleteDirectorySilently(tempDir); } } + + @Test + public void testCopyDefaultValue() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + backend.initializeForJob(new JobID()); + + KvState<Integer, IntValue, FsStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); + + kv.setCurrentKey(1); + IntValue default1 = kv.value(); + + kv.setCurrentKey(2); + IntValue default2 = kv.value(); + + assertNotNull(default1); + assertNotNull(default2); + assertEquals(default1, default2); + assertFalse(default1 == default2); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } // ------------------------------------------------------------------------ // Utilities @@ -411,7 +437,7 @@ public class FileStateBackendTest { } private static String localFileUri(File path) { - return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath(); + return path.toURI().toString(); } private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index 5f95b33..87a050b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -21,14 +21,11 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.IntValueSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.types.IntValue; import org.apache.flink.types.StringValue; import org.junit.Test; @@ -279,4 +276,28 @@ public class MemoryStateBackendTest { fail(e.getMessage()); } } + + @Test + public void testCopyDefaultValue() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + KvState<Integer, IntValue, MemoryStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); + + kv.setCurrentKey(1); + IntValue default1 = kv.value(); + + kv.setCurrentKey(2); + IntValue default2 = kv.value(); + + assertNotNull(default1); + assertNotNull(default2); + assertEquals(default1, default2); + assertFalse(default1 == default2); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } }
