[
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931129#comment-15931129
]
ASF GitHub Bot commented on FLINK-6018:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3562#discussion_r106776658
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -162,22 +176,422 @@ protected CheckpointStreamFactory
createStreamFactory() throws Exception {
}
@Test
+ public void testBackendUsesRegisteredKryoDefaultSerializer() throws
Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig()
+ .addDefaultKryoSerializer(TestPojo.class,
(Class) ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new
GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig())
instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new
ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // backends that eagerly serializer will fail when updating
state, others will
+ // fail only when performing the snapshot
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ runSnapshot(backend.snapshot(682375462378L, 2,
streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1,
numExceptions);
+ }
+
+ @Test
+ public void
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws
Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig()
+ .addDefaultKryoSerializer(TestPojo.class,
(Class) ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new
GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig())
instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new
ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state =
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ assert state instanceof InternalValueState;
+ ((InternalValueState)
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // backends that eagerly serializer will fail when updating
state, others will
+ // fail only when performing the snapshot
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ runSnapshot(backend.snapshot(682375462378L, 2,
streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1,
numExceptions);
+ }
+
+ @Test
+ public void testBackendUsesRegisteredKryoSerializer() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ env.getExecutionConfig()
+ .registerTypeWithKryoSerializer(TestPojo.class,
ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new
GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig())
instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new
ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // backends that eagerly serializer will fail when updating
state, others will
+ // fail only when performing the snapshot
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ runSnapshot(backend.snapshot(682375462378L, 2,
streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1,
numExceptions);
+ }
+
+ @Test
+ public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate()
throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ env.getExecutionConfig()
+ .registerTypeWithKryoSerializer(TestPojo.class,
ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new
GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig())
instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new
ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state =
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ assert state instanceof InternalValueState;
+ ((InternalValueState)
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // backends that eagerly serializer will fail when updating
state, others will
+ // fail only when performing the snapshot
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ runSnapshot(backend.snapshot(682375462378L, 2,
streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1,
numExceptions);
+ }
+
+
+ /**
+ * Verify that we can restore a snapshot that was done with without
registered types
+ * after registering types.
+ */
+ @Test
+ public void testKryoRegisteringRestoreResilienceWithRegisteredType()
throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ TypeInformation<TestPojo> pojoType = new
GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig())
instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new
ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.update(new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ state.update(new TestPojo("u2", 2));
+
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ env.getExecutionConfig().registerKryoType(TestPojo.class);
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot,
env);
+
+ snapshot.discardState();
+
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ assertEquals(state.value(), new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ assertEquals(state.value(), new TestPojo("u2", 2));
+
+ backend.dispose();
+ }
+
+ /**
+ * Verify that we can restore a snapshot that was done with without
registered default
--- End diff --
"with without" redundancy --> "without", I think ;-)?
> Properly initialise StateDescriptor in
> AbstractStateBackend.getPartitionedState()
> ---------------------------------------------------------------------------------
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, State Backends, Checkpointing
> Reporter: sunjincheng
> Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend #
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354: stateDescriptor.initializeSerializerUnlessSet(new
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
> return serializer != null;
> }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
> if (serializer == null) {
> if (typeInfo != null) {
> serializer =
> typeInfo.createSerializer(executionConfig);
> } else {
> throw new IllegalStateException(
> "Cannot initialize serializer
> after TypeInformation was dropped during serialization");
> }
> }
> }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has
> been checked by `serializer == null`.So I hope this code has a little
> improvement to the following:
> approach 1:
> According to the `TODO` information we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
> throw new IllegalStateException("The serializer of the
> descriptor has not been initialized!");
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized())
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you?
> Welcome anybody's feedback and corrections.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)