[FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.
Throwing away TypeInformation upon serialization was previously done because the type information was not serializable. Now that it is serializable, we can (and should) keep it to provide consistent user experience, where all serializers respect the ExecutionConfig. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87d31f5c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87d31f5c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87d31f5c Branch: refs/heads/master Commit: 87d31f5cf76fa796b89201ed8c55890e7d36fc81 Parents: 87dcc89 Author: Stephan Ewen <[email protected]> Authored: Tue Mar 20 16:22:12 2018 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 22 15:45:54 2018 +0100 ---------------------------------------------------------------------- .../flink/api/common/state/StateDescriptor.java | 41 +++-- .../common/state/ListStateDescriptorTest.java | 48 +----- .../common/state/MapStateDescriptorTest.java | 54 +----- .../state/ReducingStateDescriptorTest.java | 54 +----- .../api/common/state/StateDescriptorTest.java | 171 +++++++++++++++++++ .../common/state/ValueStateDescriptorTest.java | 71 -------- 6 files changed, 200 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 5ec59e4..574c836 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -27,6 +27,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -35,6 +37,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned @@ -76,19 +79,24 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl protected final String name; /** The serializer for the type. May be eagerly initialized in the constructor, - * or lazily once the type is serialized or an ExecutionConfig is provided. */ + * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method + * is called. */ + @Nullable protected TypeSerializer<T> serializer; + /** The type information describing the value type. Only used to if the serializer + * is created lazily. */ + @Nullable + private TypeInformation<T> typeInfo; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable private String queryableStateName; /** The default value returned by the state when no other value is bound to a key. */ + @Nullable protected transient T defaultValue; - /** The type information describing the value type. Only used to lazily create the serializer - * and dropped during serialization */ - private transient TypeInformation<T> typeInfo; - // ------------------------------------------------------------------------ /** @@ -99,7 +107,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ - protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) { + protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.serializer = checkNotNull(serializer, "serializer must not be null"); this.defaultValue = defaultValue; @@ -113,7 +121,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ - protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) { + protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.typeInfo = checkNotNull(typeInfo, "type information must not be null"); this.defaultValue = defaultValue; @@ -130,7 +138,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ - protected StateDescriptor(String name, Class<T> type, T defaultValue) { + protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); checkNotNull(type, "type class must not be null"); @@ -208,6 +216,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl * * @return Queryable state name or <code>null</code> if not set. */ + @Nullable public String getQueryableStateName() { return queryableStateName; } @@ -249,12 +258,13 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { if (serializer == null) { - if (typeInfo != null) { - serializer = typeInfo.createSerializer(executionConfig); - } else { - throw new IllegalStateException( - "Cannot initialize serializer after TypeInformation was dropped during serialization"); - } + checkState(typeInfo != null, "no serializer and no type info"); + + // instantiate the serializer + serializer = typeInfo.createSerializer(executionConfig); + + // we can drop the type info now, no longer needed + typeInfo = null; } } @@ -285,9 +295,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl // ------------------------------------------------------------------------ private void writeObject(final ObjectOutputStream out) throws IOException { - // make sure we have a serializer before the type information gets lost - initializeSerializerUnlessSet(new ExecutionConfig()); - // write all the non-transient fields out.defaultWriteObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java index f45d296..e7e33e7 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java @@ -19,12 +19,9 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -35,7 +32,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Tests for the {@link ListStateDescriptor}. @@ -43,7 +39,7 @@ import static org.junit.Assert.fail; public class ListStateDescriptorTest { @Test - public void testValueStateDescriptorEagerSerializer() throws Exception { + public void testListStateDescriptor() throws Exception { TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); @@ -66,48 +62,6 @@ public class ListStateDescriptorTest { assertEquals(serializer, copy.getElementSerializer()); } - @Test - public void testValueStateDescriptorLazySerializer() throws Exception { - // some different registered value - ExecutionConfig cfg = new ExecutionConfig(); - cfg.registerKryoType(TaskInfo.class); - - ListStateDescriptor<Path> descr = - new ListStateDescriptor<>("testName", Path.class); - - try { - descr.getSerializer(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) {} - - descr.initializeSerializerUnlessSet(cfg); - - assertNotNull(descr.getSerializer()); - assertTrue(descr.getSerializer() instanceof ListSerializer); - - assertNotNull(descr.getElementSerializer()); - assertTrue(descr.getElementSerializer() instanceof KryoSerializer); - - assertTrue(((KryoSerializer<?>) descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); - } - - @Test - public void testValueStateDescriptorAutoSerializer() throws Exception { - - ListStateDescriptor<String> descr = - new ListStateDescriptor<>("testName", String.class); - - ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); - - assertEquals("testName", copy.getName()); - - assertNotNull(copy.getSerializer()); - assertTrue(copy.getSerializer() instanceof ListSerializer); - - assertNotNull(copy.getElementSerializer()); - assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); - } - /** * FLINK-6775. * http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java index 2151834..4e64c0f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java @@ -19,13 +19,9 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -36,7 +32,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Tests for the {@link MapStateDescriptor}. @@ -44,7 +39,7 @@ import static org.junit.Assert.fail; public class MapStateDescriptorTest { @Test - public void testMapStateDescriptorEagerSerializer() throws Exception { + public void testMapStateDescriptor() throws Exception { TypeSerializer<Integer> keySerializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); TypeSerializer<String> valueSerializer = new KryoSerializer<>(String.class, new ExecutionConfig()); @@ -72,53 +67,6 @@ public class MapStateDescriptorTest { assertEquals(valueSerializer, copy.getValueSerializer()); } - @Test - public void testMapStateDescriptorLazySerializer() throws Exception { - // some different registered value - ExecutionConfig cfg = new ExecutionConfig(); - cfg.registerKryoType(TaskInfo.class); - - MapStateDescriptor<Path, String> descr = - new MapStateDescriptor<>("testName", Path.class, String.class); - - try { - descr.getSerializer(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) {} - - descr.initializeSerializerUnlessSet(cfg); - - assertNotNull(descr.getSerializer()); - assertTrue(descr.getSerializer() instanceof MapSerializer); - - assertNotNull(descr.getKeySerializer()); - assertTrue(descr.getKeySerializer() instanceof KryoSerializer); - - assertTrue(((KryoSerializer<?>) descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); - - assertNotNull(descr.getValueSerializer()); - assertTrue(descr.getValueSerializer() instanceof StringSerializer); - } - - @Test - public void testMapStateDescriptorAutoSerializer() throws Exception { - - MapStateDescriptor<String, Long> descr = - new MapStateDescriptor<>("testName", String.class, Long.class); - - MapStateDescriptor<String, Long> copy = CommonTestUtils.createCopySerializable(descr); - - assertEquals("testName", copy.getName()); - - assertNotNull(copy.getSerializer()); - assertTrue(copy.getSerializer() instanceof MapSerializer); - - assertNotNull(copy.getKeySerializer()); - assertEquals(StringSerializer.INSTANCE, copy.getKeySerializer()); - assertNotNull(copy.getValueSerializer()); - assertEquals(LongSerializer.INSTANCE, copy.getValueSerializer()); - } - /** * FLINK-6775. * http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java index 1e21a78..81b7c38 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java @@ -19,12 +19,9 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; @@ -33,9 +30,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; /** * Tests for the {@link ReducingStateDescriptor}. @@ -43,10 +37,9 @@ import static org.mockito.Mockito.mock; public class ReducingStateDescriptorTest extends TestLogger { @Test - public void testValueStateDescriptorEagerSerializer() throws Exception { + public void testReducingStateDescriptor() throws Exception { - @SuppressWarnings("unchecked") - ReduceFunction<String> reducer = mock(ReduceFunction.class); + ReduceFunction<String> reducer = (a, b) -> a; TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); @@ -56,6 +49,7 @@ public class ReducingStateDescriptorTest extends TestLogger { assertEquals("testName", descr.getName()); assertNotNull(descr.getSerializer()); assertEquals(serializer, descr.getSerializer()); + assertEquals(reducer, descr.getReduceFunction()); ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); @@ -64,48 +58,6 @@ public class ReducingStateDescriptorTest extends TestLogger { assertEquals(serializer, copy.getSerializer()); } - @Test - public void testValueStateDescriptorLazySerializer() throws Exception { - - @SuppressWarnings("unchecked") - ReduceFunction<Path> reducer = mock(ReduceFunction.class); - - // some different registered value - ExecutionConfig cfg = new ExecutionConfig(); - cfg.registerKryoType(TaskInfo.class); - - ReducingStateDescriptor<Path> descr = - new ReducingStateDescriptor<>("testName", reducer, Path.class); - - try { - descr.getSerializer(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) {} - - descr.initializeSerializerUnlessSet(cfg); - - assertNotNull(descr.getSerializer()); - assertTrue(descr.getSerializer() instanceof KryoSerializer); - - assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); - } - - @Test - public void testValueStateDescriptorAutoSerializer() throws Exception { - - @SuppressWarnings("unchecked") - ReduceFunction<String> reducer = mock(ReduceFunction.class); - - ReducingStateDescriptor<String> descr = - new ReducingStateDescriptor<>("testName", reducer, String.class); - - ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); - - assertEquals("testName", copy.getName()); - assertNotNull(copy.getSerializer()); - assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); - } - /** * FLINK-6775. * http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java new file mode 100644 index 0000000..59293f4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the common/shared functionality of {@link StateDescriptor}. + */ +public class StateDescriptorTest { + + @Test + public void testInitializeWithSerializer() throws Exception { + final TypeSerializer<String> serializer = StringSerializer.INSTANCE; + final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", serializer); + + assertTrue(descr.isSerializerInitialized()); + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof StringSerializer); + + // this should not have any effect + descr.initializeSerializerUnlessSet(new ExecutionConfig()); + assertTrue(descr.isSerializerInitialized()); + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof StringSerializer); + + TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); + assertTrue(clone.isSerializerInitialized()); + assertNotNull(clone.getSerializer()); + assertTrue(clone.getSerializer() instanceof StringSerializer); + } + + @Test + public void testInitializeSerializerBeforeSerialization() throws Exception { + final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class); + + assertFalse(descr.isSerializerInitialized()); + try { + descr.getSerializer(); + fail("should fail with an exception"); + } catch (IllegalStateException ignored) {} + + descr.initializeSerializerUnlessSet(new ExecutionConfig()); + + assertTrue(descr.isSerializerInitialized()); + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof StringSerializer); + + TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); + + assertTrue(clone.isSerializerInitialized()); + assertNotNull(clone.getSerializer()); + assertTrue(clone.getSerializer() instanceof StringSerializer); + } + + @Test + public void testInitializeSerializerAfterSerialization() throws Exception { + final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class); + + assertFalse(descr.isSerializerInitialized()); + try { + descr.getSerializer(); + fail("should fail with an exception"); + } catch (IllegalStateException ignored) {} + + TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); + + assertFalse(clone.isSerializerInitialized()); + try { + clone.getSerializer(); + fail("should fail with an exception"); + } catch (IllegalStateException ignored) {} + + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + + assertTrue(clone.isSerializerInitialized()); + assertNotNull(clone.getSerializer()); + assertTrue(clone.getSerializer() instanceof StringSerializer); + } + + @Test + public void testInitializeSerializerAfterSerializationWithCustomConfig() throws Exception { + // guard our test assumptions. + assertEquals("broken test assumption", -1, + new KryoSerializer<>(String.class, new ExecutionConfig()).getKryo() + .getRegistration(File.class).getId()); + + final ExecutionConfig config = new ExecutionConfig(); + config.registerKryoType(File.class); + + final TestStateDescriptor<Path> original = new TestStateDescriptor<>("test", Path.class); + TestStateDescriptor<Path> clone = CommonTestUtils.createCopySerializable(original); + + clone.initializeSerializerUnlessSet(config); + + // serialized one (later initialized) carries the registration + assertTrue(((KryoSerializer<?>) clone.getSerializer()).getKryo() + .getRegistration(File.class).getId() > 0); + } + + // ------------------------------------------------------------------------ + + private static class TestStateDescriptor<T> extends StateDescriptor<State, T> { + + private static final long serialVersionUID = 1L; + + TestStateDescriptor(String name, TypeSerializer<T> serializer) { + super(name, serializer, null); + } + + TestStateDescriptor(String name, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); + } + + TestStateDescriptor(String name, Class<T> type) { + super(name, type, null); + } + + @Override + public State bind(StateBinder stateBinder) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Type getType() { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + return 584523; + } + + @Override + public boolean equals(Object o) { + return o != null && o.getClass() == TestStateDescriptor.class; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/87d31f5c/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java index f3b9eee..7ee58fe 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -19,24 +19,17 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.io.File; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Tests for the {@link ValueStateDescriptor}. @@ -44,70 +37,6 @@ import static org.junit.Assert.fail; public class ValueStateDescriptorTest extends TestLogger { @Test - public void testValueStateDescriptorEagerSerializer() throws Exception { - - TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); - String defaultValue = "le-value-default"; - - ValueStateDescriptor<String> descr = - new ValueStateDescriptor<>("testName", serializer, defaultValue); - - assertEquals("testName", descr.getName()); - assertEquals(defaultValue, descr.getDefaultValue()); - assertNotNull(descr.getSerializer()); - assertEquals(serializer, descr.getSerializer()); - - ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); - - assertEquals("testName", copy.getName()); - assertEquals(defaultValue, copy.getDefaultValue()); - assertNotNull(copy.getSerializer()); - assertEquals(serializer, copy.getSerializer()); - } - - @Test - public void testValueStateDescriptorLazySerializer() throws Exception { - - // some default value that goes to the generic serializer - Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI()); - - // some different registered value - ExecutionConfig cfg = new ExecutionConfig(); - cfg.registerKryoType(TaskInfo.class); - - ValueStateDescriptor<Path> descr = - new ValueStateDescriptor<>("testName", Path.class, defaultValue); - - try { - descr.getSerializer(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) {} - - descr.initializeSerializerUnlessSet(cfg); - - assertNotNull(descr.getSerializer()); - assertTrue(descr.getSerializer() instanceof KryoSerializer); - - assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); - } - - @Test - public void testValueStateDescriptorAutoSerializer() throws Exception { - - String defaultValue = "le-value-default"; - - ValueStateDescriptor<String> descr = - new ValueStateDescriptor<>("testName", String.class, defaultValue); - - ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); - - assertEquals("testName", copy.getName()); - assertEquals(defaultValue, copy.getDefaultValue()); - assertNotNull(copy.getSerializer()); - assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); - } - - @Test public void testVeryLargeDefaultValue() throws Exception { // ensure that we correctly read very large data when deserializing the default value
