[FLINK-2888] [streaming] State backends return copies of the default values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6424ce57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6424ce57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6424ce57

Branch: refs/heads/master
Commit: 6424ce57b4012af64adc88ef37f341d6a5aec744
Parents: 2d64715
Author: Stephan Ewen <[email protected]>
Authored: Thu Oct 22 11:24:12 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Sun Oct 25 19:13:11 2015 +0100

----------------------------------------------------------------------
 .../runtime/state/AbstractHeapKvState.java      |  3 +-
 .../runtime/state/FileStateBackendTest.java     | 40 ++++++++++++++++----
 .../runtime/state/MemoryStateBackendTest.java   | 31 ++++++++++++---
 3 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
index 12250b9..23703b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend 
extends StateBackend<Bac
        @Override
        public V value() {
                V value = state.get(currentKey);
-               return value != null ? value : defaultValue;
+               return value != null ? value : 
+                               (defaultValue == null ? null : 
valueSerializer.copy(defaultValue));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 481fb98..7182a36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
 
 import org.junit.Test;
 
@@ -382,6 +378,36 @@ public class FileStateBackendTest {
                        deleteDirectorySilently(tempDir);
                }
        }
+
+       @Test
+       public void testCopyDefaultValue() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       backend.initializeForJob(new JobID());
+                       
+                       KvState<Integer, IntValue, FsStateBackend> kv =
+                                       
backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new 
IntValue(-1));
+
+                       kv.setCurrentKey(1);
+                       IntValue default1 = kv.value();
+
+                       kv.setCurrentKey(2);
+                       IntValue default2 = kv.value();
+
+                       assertNotNull(default1);
+                       assertNotNull(default2);
+                       assertEquals(default1, default2);
+                       assertFalse(default1 == default2);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
        
        // 
------------------------------------------------------------------------
        //  Utilities
@@ -411,7 +437,7 @@ public class FileStateBackendTest {
        }
        
        private static String localFileUri(File path) {
-               return (OperatingSystem.isWindows() ? "file:/" : "file://") + 
path.getAbsolutePath();
+               return path.toURI().toString();
        }
        
        private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/6424ce57/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 5f95b33..87a050b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -21,14 +21,11 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
@@ -279,4 +276,28 @@ public class MemoryStateBackendTest {
                        fail(e.getMessage());
                }
        }
+       
+       @Test
+       public void testCopyDefaultValue() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+                       KvState<Integer, IntValue, MemoryStateBackend> kv =
+                                       
backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new 
IntValue(-1));
+
+                       kv.setCurrentKey(1);
+                       IntValue default1 = kv.value();
+
+                       kv.setCurrentKey(2);
+                       IntValue default2 = kv.value();
+                       
+                       assertNotNull(default1);
+                       assertNotNull(default2);
+                       assertEquals(default1, default2);
+                       assertFalse(default1 == default2);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 }

Reply via email to