[FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon 
serialization.

Throwing away TypeInformation upon serialization was previously done because 
the type
information was not serializable. Now that it is serializable, we can (and 
should) keep
it to provide consistent user experience, where all serializers respect the 
ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87d31f5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87d31f5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87d31f5c

Branch: refs/heads/master
Commit: 87d31f5cf76fa796b89201ed8c55890e7d36fc81
Parents: 87dcc89
Author: Stephan Ewen <[email protected]>
Authored: Tue Mar 20 16:22:12 2018 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Mar 22 15:45:54 2018 +0100

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java |  41 +++--
 .../common/state/ListStateDescriptorTest.java   |  48 +-----
 .../common/state/MapStateDescriptorTest.java    |  54 +-----
 .../state/ReducingStateDescriptorTest.java      |  54 +-----
 .../api/common/state/StateDescriptorTest.java   | 171 +++++++++++++++++++
 .../common/state/ValueStateDescriptorTest.java  |  71 --------
 6 files changed, 200 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 5ec59e4..574c836 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,6 +37,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
@@ -76,19 +79,24 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        protected final String name;
 
        /** The serializer for the type. May be eagerly initialized in the 
constructor,
-        * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+        * or lazily once the {@link 
#initializeSerializerUnlessSet(ExecutionConfig)} method
+        * is called. */
+       @Nullable
        protected TypeSerializer<T> serializer;
 
+       /** The type information describing the value type. Only used to if the 
serializer
+        * is created lazily. */
+       @Nullable
+       private TypeInformation<T> typeInfo;
+
        /** Name for queries against state created from this StateDescriptor. */
+       @Nullable
        private String queryableStateName;
 
        /** The default value returned by the state when no other value is 
bound to a key. */
+       @Nullable
        protected transient T defaultValue;
 
-       /** The type information describing the value type. Only used to lazily 
create the serializer
-        * and dropped during serialization */
-       private transient TypeInformation<T> typeInfo;
-
        // 
------------------------------------------------------------------------
 
        /**
@@ -99,7 +107,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
-       protected StateDescriptor(String name, TypeSerializer<T> serializer, T 
defaultValue) {
+       protected StateDescriptor(String name, TypeSerializer<T> serializer, 
@Nullable T defaultValue) {
                this.name = checkNotNull(name, "name must not be null");
                this.serializer = checkNotNull(serializer, "serializer must not 
be null");
                this.defaultValue = defaultValue;
@@ -113,7 +121,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
-       protected StateDescriptor(String name, TypeInformation<T> typeInfo, T 
defaultValue) {
+       protected StateDescriptor(String name, TypeInformation<T> typeInfo, 
@Nullable T defaultValue) {
                this.name = checkNotNull(name, "name must not be null");
                this.typeInfo = checkNotNull(typeInfo, "type information must 
not be null");
                this.defaultValue = defaultValue;
@@ -130,7 +138,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
-       protected StateDescriptor(String name, Class<T> type, T defaultValue) {
+       protected StateDescriptor(String name, Class<T> type, @Nullable T 
defaultValue) {
                this.name = checkNotNull(name, "name must not be null");
                checkNotNull(type, "type class must not be null");
 
@@ -208,6 +216,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         *
         * @return Queryable state name or <code>null</code> if not set.
         */
+       @Nullable
        public String getQueryableStateName() {
                return queryableStateName;
        }
@@ -249,12 +258,13 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         */
        public void initializeSerializerUnlessSet(ExecutionConfig 
executionConfig) {
                if (serializer == null) {
-                       if (typeInfo != null) {
-                               serializer = 
typeInfo.createSerializer(executionConfig);
-                       } else {
-                               throw new IllegalStateException(
-                                               "Cannot initialize serializer 
after TypeInformation was dropped during serialization");
-                       }
+                       checkState(typeInfo != null, "no serializer and no type 
info");
+
+                       // instantiate the serializer
+                       serializer = typeInfo.createSerializer(executionConfig);
+
+                       // we can drop the type info now, no longer needed
+                       typeInfo  = null;
                }
        }
 
@@ -285,9 +295,6 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        // 
------------------------------------------------------------------------
 
        private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               // make sure we have a serializer before the type information 
gets lost
-               initializeSerializerUnlessSet(new ExecutionConfig());
-
                // write all the non-transient fields
                out.defaultWriteObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index f45d296..e7e33e7 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
@@ -35,7 +32,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link ListStateDescriptor}.
@@ -43,7 +39,7 @@ import static org.junit.Assert.fail;
 public class ListStateDescriptorTest {
 
        @Test
-       public void testValueStateDescriptorEagerSerializer() throws Exception {
+       public void testListStateDescriptor() throws Exception {
 
                TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
 
@@ -66,48 +62,6 @@ public class ListStateDescriptorTest {
                assertEquals(serializer, copy.getElementSerializer());
        }
 
-       @Test
-       public void testValueStateDescriptorLazySerializer() throws Exception {
-               // some different registered value
-               ExecutionConfig cfg = new ExecutionConfig();
-               cfg.registerKryoType(TaskInfo.class);
-
-               ListStateDescriptor<Path> descr =
-                               new ListStateDescriptor<>("testName", 
Path.class);
-
-               try {
-                       descr.getSerializer();
-                       fail("should cause an exception");
-               } catch (IllegalStateException ignored) {}
-
-               descr.initializeSerializerUnlessSet(cfg);
-
-               assertNotNull(descr.getSerializer());
-               assertTrue(descr.getSerializer() instanceof ListSerializer);
-
-               assertNotNull(descr.getElementSerializer());
-               assertTrue(descr.getElementSerializer() instanceof 
KryoSerializer);
-
-               assertTrue(((KryoSerializer<?>) 
descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() 
> 0);
-       }
-
-       @Test
-       public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-               ListStateDescriptor<String> descr =
-                               new ListStateDescriptor<>("testName", 
String.class);
-
-               ListStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
-
-               assertEquals("testName", copy.getName());
-
-               assertNotNull(copy.getSerializer());
-               assertTrue(copy.getSerializer() instanceof ListSerializer);
-
-               assertNotNull(copy.getElementSerializer());
-               assertEquals(StringSerializer.INSTANCE, 
copy.getElementSerializer());
-       }
-
        /**
         * FLINK-6775.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 2151834..4e64c0f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -19,13 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
@@ -36,7 +32,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link MapStateDescriptor}.
@@ -44,7 +39,7 @@ import static org.junit.Assert.fail;
 public class MapStateDescriptorTest {
 
        @Test
-       public void testMapStateDescriptorEagerSerializer() throws Exception {
+       public void testMapStateDescriptor() throws Exception {
 
                TypeSerializer<Integer> keySerializer = new 
KryoSerializer<>(Integer.class, new ExecutionConfig());
                TypeSerializer<String> valueSerializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
@@ -72,53 +67,6 @@ public class MapStateDescriptorTest {
                assertEquals(valueSerializer, copy.getValueSerializer());
        }
 
-       @Test
-       public void testMapStateDescriptorLazySerializer() throws Exception {
-               // some different registered value
-               ExecutionConfig cfg = new ExecutionConfig();
-               cfg.registerKryoType(TaskInfo.class);
-
-               MapStateDescriptor<Path, String> descr =
-                               new MapStateDescriptor<>("testName", 
Path.class, String.class);
-
-               try {
-                       descr.getSerializer();
-                       fail("should cause an exception");
-               } catch (IllegalStateException ignored) {}
-
-               descr.initializeSerializerUnlessSet(cfg);
-
-               assertNotNull(descr.getSerializer());
-               assertTrue(descr.getSerializer() instanceof MapSerializer);
-
-               assertNotNull(descr.getKeySerializer());
-               assertTrue(descr.getKeySerializer() instanceof KryoSerializer);
-
-               assertTrue(((KryoSerializer<?>) 
descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 
0);
-
-               assertNotNull(descr.getValueSerializer());
-               assertTrue(descr.getValueSerializer() instanceof 
StringSerializer);
-       }
-
-       @Test
-       public void testMapStateDescriptorAutoSerializer() throws Exception {
-
-               MapStateDescriptor<String, Long> descr =
-                               new MapStateDescriptor<>("testName", 
String.class, Long.class);
-
-               MapStateDescriptor<String, Long> copy = 
CommonTestUtils.createCopySerializable(descr);
-
-               assertEquals("testName", copy.getName());
-
-               assertNotNull(copy.getSerializer());
-               assertTrue(copy.getSerializer() instanceof MapSerializer);
-
-               assertNotNull(copy.getKeySerializer());
-               assertEquals(StringSerializer.INSTANCE, 
copy.getKeySerializer());
-               assertNotNull(copy.getValueSerializer());
-               assertEquals(LongSerializer.INSTANCE, 
copy.getValueSerializer());
-       }
-
        /**
         * FLINK-6775.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 1e21a78..81b7c38 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -33,9 +30,6 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link ReducingStateDescriptor}.
@@ -43,10 +37,9 @@ import static org.mockito.Mockito.mock;
 public class ReducingStateDescriptorTest extends TestLogger {
 
        @Test
-       public void testValueStateDescriptorEagerSerializer() throws Exception {
+       public void testReducingStateDescriptor() throws Exception {
 
-               @SuppressWarnings("unchecked")
-               ReduceFunction<String> reducer = mock(ReduceFunction.class);
+               ReduceFunction<String> reducer = (a, b) -> a;
 
                TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
 
@@ -56,6 +49,7 @@ public class ReducingStateDescriptorTest extends TestLogger {
                assertEquals("testName", descr.getName());
                assertNotNull(descr.getSerializer());
                assertEquals(serializer, descr.getSerializer());
+               assertEquals(reducer, descr.getReduceFunction());
 
                ReducingStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
 
@@ -64,48 +58,6 @@ public class ReducingStateDescriptorTest extends TestLogger {
                assertEquals(serializer, copy.getSerializer());
        }
 
-       @Test
-       public void testValueStateDescriptorLazySerializer() throws Exception {
-
-               @SuppressWarnings("unchecked")
-               ReduceFunction<Path> reducer = mock(ReduceFunction.class);
-
-               // some different registered value
-               ExecutionConfig cfg = new ExecutionConfig();
-               cfg.registerKryoType(TaskInfo.class);
-
-               ReducingStateDescriptor<Path> descr =
-                               new ReducingStateDescriptor<>("testName", 
reducer, Path.class);
-
-               try {
-                       descr.getSerializer();
-                       fail("should cause an exception");
-               } catch (IllegalStateException ignored) {}
-
-               descr.initializeSerializerUnlessSet(cfg);
-
-               assertNotNull(descr.getSerializer());
-               assertTrue(descr.getSerializer() instanceof KryoSerializer);
-
-               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-       }
-
-       @Test
-       public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-               @SuppressWarnings("unchecked")
-               ReduceFunction<String> reducer = mock(ReduceFunction.class);
-
-               ReducingStateDescriptor<String> descr =
-                               new ReducingStateDescriptor<>("testName", 
reducer, String.class);
-
-               ReducingStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
-
-               assertEquals("testName", copy.getName());
-               assertNotNull(copy.getSerializer());
-               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
-       }
-
        /**
         * FLINK-6775.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
new file mode 100644
index 0000000..59293f4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the common/shared functionality of {@link StateDescriptor}.
+ */
+public class StateDescriptorTest {
+
+       @Test
+       public void testInitializeWithSerializer() throws Exception {
+               final TypeSerializer<String> serializer = 
StringSerializer.INSTANCE;
+               final TestStateDescriptor<String> descr = new 
TestStateDescriptor<>("test", serializer);
+
+               assertTrue(descr.isSerializerInitialized());
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+               // this should not have any effect
+               descr.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertTrue(descr.isSerializerInitialized());
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+               TestStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(descr);
+               assertTrue(clone.isSerializerInitialized());
+               assertNotNull(clone.getSerializer());
+               assertTrue(clone.getSerializer() instanceof StringSerializer);
+       }
+
+       @Test
+       public void testInitializeSerializerBeforeSerialization() throws 
Exception {
+               final TestStateDescriptor<String> descr = new 
TestStateDescriptor<>("test", String.class);
+
+               assertFalse(descr.isSerializerInitialized());
+               try {
+                       descr.getSerializer();
+                       fail("should fail with an exception");
+               } catch (IllegalStateException ignored) {}
+
+               descr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               assertTrue(descr.isSerializerInitialized());
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+               TestStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertTrue(clone.isSerializerInitialized());
+               assertNotNull(clone.getSerializer());
+               assertTrue(clone.getSerializer() instanceof StringSerializer);
+       }
+
+       @Test
+       public void testInitializeSerializerAfterSerialization() throws 
Exception {
+               final TestStateDescriptor<String> descr = new 
TestStateDescriptor<>("test", String.class);
+
+               assertFalse(descr.isSerializerInitialized());
+               try {
+                       descr.getSerializer();
+                       fail("should fail with an exception");
+               } catch (IllegalStateException ignored) {}
+
+               TestStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertFalse(clone.isSerializerInitialized());
+               try {
+                       clone.getSerializer();
+                       fail("should fail with an exception");
+               } catch (IllegalStateException ignored) {}
+
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               assertTrue(clone.isSerializerInitialized());
+               assertNotNull(clone.getSerializer());
+               assertTrue(clone.getSerializer() instanceof StringSerializer);
+       }
+
+       @Test
+       public void 
testInitializeSerializerAfterSerializationWithCustomConfig() throws Exception {
+               // guard our test assumptions.
+               assertEquals("broken test assumption", -1,
+                               new KryoSerializer<>(String.class, new 
ExecutionConfig()).getKryo()
+                                               
.getRegistration(File.class).getId());
+
+               final ExecutionConfig config = new ExecutionConfig();
+               config.registerKryoType(File.class);
+
+               final TestStateDescriptor<Path> original = new 
TestStateDescriptor<>("test", Path.class);
+               TestStateDescriptor<Path> clone = 
CommonTestUtils.createCopySerializable(original);
+
+               clone.initializeSerializerUnlessSet(config);
+
+               // serialized one (later initialized) carries the registration
+               assertTrue(((KryoSerializer<?>) clone.getSerializer()).getKryo()
+                               .getRegistration(File.class).getId() > 0);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class TestStateDescriptor<T> extends 
StateDescriptor<State, T> {
+
+               private static final long serialVersionUID = 1L;
+
+               TestStateDescriptor(String name, TypeSerializer<T> serializer) {
+                       super(name, serializer, null);
+               }
+
+               TestStateDescriptor(String name, TypeInformation<T> typeInfo) {
+                       super(name, typeInfo, null);
+               }
+
+               TestStateDescriptor(String name, Class<T> type) {
+                       super(name, type, null);
+               }
+
+               @Override
+               public State bind(StateBinder stateBinder) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Type getType() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public int hashCode() {
+                       return 584523;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o != null && o.getClass() == 
TestStateDescriptor.class;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index f3b9eee..7ee58fe 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -19,24 +19,17 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.io.File;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link ValueStateDescriptor}.
@@ -44,70 +37,6 @@ import static org.junit.Assert.fail;
 public class ValueStateDescriptorTest extends TestLogger {
 
        @Test
-       public void testValueStateDescriptorEagerSerializer() throws Exception {
-
-               TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
-               String defaultValue = "le-value-default";
-
-               ValueStateDescriptor<String> descr =
-                               new ValueStateDescriptor<>("testName", 
serializer, defaultValue);
-
-               assertEquals("testName", descr.getName());
-               assertEquals(defaultValue, descr.getDefaultValue());
-               assertNotNull(descr.getSerializer());
-               assertEquals(serializer, descr.getSerializer());
-
-               ValueStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
-
-               assertEquals("testName", copy.getName());
-               assertEquals(defaultValue, copy.getDefaultValue());
-               assertNotNull(copy.getSerializer());
-               assertEquals(serializer, copy.getSerializer());
-       }
-
-       @Test
-       public void testValueStateDescriptorLazySerializer() throws Exception {
-
-               // some default value that goes to the generic serializer
-               Path defaultValue = new Path(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
-
-               // some different registered value
-               ExecutionConfig cfg = new ExecutionConfig();
-               cfg.registerKryoType(TaskInfo.class);
-
-               ValueStateDescriptor<Path> descr =
-                               new ValueStateDescriptor<>("testName", 
Path.class, defaultValue);
-
-               try {
-                       descr.getSerializer();
-                       fail("should cause an exception");
-               } catch (IllegalStateException ignored) {}
-
-               descr.initializeSerializerUnlessSet(cfg);
-
-               assertNotNull(descr.getSerializer());
-               assertTrue(descr.getSerializer() instanceof KryoSerializer);
-
-               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-       }
-
-       @Test
-       public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-               String defaultValue = "le-value-default";
-
-               ValueStateDescriptor<String> descr =
-                               new ValueStateDescriptor<>("testName", 
String.class, defaultValue);
-
-               ValueStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
-
-               assertEquals("testName", copy.getName());
-               assertEquals(defaultValue, copy.getDefaultValue());
-               assertNotNull(copy.getSerializer());
-               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
-       }
-
-       @Test
        public void testVeryLargeDefaultValue() throws Exception {
                // ensure that we correctly read very large data when 
deserializing the default value
 

Reply via email to