[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931129#comment-15931129
 ] 

ASF GitHub Bot commented on FLINK-6018:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3562#discussion_r106776658
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
    @@ -162,22 +176,422 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
        }
     
        @Test
    +   public void testBackendUsesRegisteredKryoDefaultSerializer() throws 
Exception {
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment("test", 1, 0);
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           // cast because our test serializer is not typed to TestPojo
    +           env.getExecutionConfig()
    +                           .addDefaultKryoSerializer(TestPojo.class, 
(Class) ExceptionThrowingTestSerializer.class);
    +
    +           TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
    +
    +           // make sure that we are in fact using the KryoSerializer
    +           assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
    +
    +           pojoType.createSerializer(env.getExecutionConfig());
    +
    +           ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
    +
    +           ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
    +
    +           // backends that eagerly serializer will fail when updating 
state, others will
    +           // fail only when performing the snapshot
    +           int numExceptions = 0;
    +
    +           backend.setCurrentKey(1);
    +
    +           try {
    +                   state.update(new TestPojo("u1", 1));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           } catch (RuntimeException e) {
    +                   if (e.getCause() instanceof ExpectedKryoTestException) {
    +                           numExceptions++;
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           try {
    +                   runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           }
    +
    +           assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
    +   }
    +
    +   @Test
    +   public void 
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws 
Exception {
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment("test", 1, 0);
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           // cast because our test serializer is not typed to TestPojo
    +           env.getExecutionConfig()
    +                           .addDefaultKryoSerializer(TestPojo.class, 
(Class) ExceptionThrowingTestSerializer.class);
    +
    +           TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
    +
    +           // make sure that we are in fact using the KryoSerializer
    +           assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
    +
    +           pojoType.createSerializer(env.getExecutionConfig());
    +
    +           ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
    +
    +           ValueState<TestPojo> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +           assert state instanceof InternalValueState;
    +           ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +           // backends that eagerly serializer will fail when updating 
state, others will
    +           // fail only when performing the snapshot
    +           int numExceptions = 0;
    +
    +           backend.setCurrentKey(1);
    +
    +           try {
    +                   state.update(new TestPojo("u1", 1));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           } catch (RuntimeException e) {
    +                   if (e.getCause() instanceof ExpectedKryoTestException) {
    +                           numExceptions++;
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           try {
    +                   runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           }
    +
    +           assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
    +   }
    +
    +   @Test
    +   public void testBackendUsesRegisteredKryoSerializer() throws Exception {
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment("test", 1, 0);
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           env.getExecutionConfig()
    +                           .registerTypeWithKryoSerializer(TestPojo.class, 
ExceptionThrowingTestSerializer.class);
    +
    +           TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
    +
    +           // make sure that we are in fact using the KryoSerializer
    +           assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
    +
    +           pojoType.createSerializer(env.getExecutionConfig());
    +
    +           ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
    +
    +           ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
    +
    +           // backends that eagerly serializer will fail when updating 
state, others will
    +           // fail only when performing the snapshot
    +           int numExceptions = 0;
    +
    +           backend.setCurrentKey(1);
    +
    +           try {
    +                   state.update(new TestPojo("u1", 1));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           } catch (RuntimeException e) {
    +                   if (e.getCause() instanceof ExpectedKryoTestException) {
    +                           numExceptions++;
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           try {
    +                   runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           }
    +
    +           assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
    +   }
    +
    +   @Test
    +   public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() 
throws Exception {
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment("test", 1, 0);
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           env.getExecutionConfig()
    +                           .registerTypeWithKryoSerializer(TestPojo.class, 
ExceptionThrowingTestSerializer.class);
    +
    +           TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
    +
    +           // make sure that we are in fact using the KryoSerializer
    +           assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
    +
    +           pojoType.createSerializer(env.getExecutionConfig());
    +
    +           ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
    +
    +           ValueState<TestPojo> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +           assert state instanceof InternalValueState;
    +           ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +           // backends that eagerly serializer will fail when updating 
state, others will
    +           // fail only when performing the snapshot
    +           int numExceptions = 0;
    +
    +           backend.setCurrentKey(1);
    +
    +           try {
    +                   state.update(new TestPojo("u1", 1));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           } catch (RuntimeException e) {
    +                   if (e.getCause() instanceof ExpectedKryoTestException) {
    +                           numExceptions++;
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           try {
    +                   runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
    +           } catch (ExpectedKryoTestException e) {
    +                   numExceptions++;
    +           }
    +
    +           assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
    +   }
    +
    +
    +   /**
    +    * Verify that we can restore a snapshot that was done with without 
registered types
    +    * after registering types.
    +    */
    +   @Test
    +   public void testKryoRegisteringRestoreResilienceWithRegisteredType() 
throws Exception {
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment("test", 1, 0);
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
    +
    +           // make sure that we are in fact using the KryoSerializer
    +           assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
    +
    +           pojoType.createSerializer(env.getExecutionConfig());
    +
    +           ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
    +
    +           ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
    +
    +           // make some more modifications
    +           backend.setCurrentKey(1);
    +           state.update(new TestPojo("u1", 1));
    +
    +           backend.setCurrentKey(2);
    +           state.update(new TestPojo("u2", 2));
    +
    +           KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
    +                           682375462378L,
    +                           2,
    +                           streamFactory,
    +                           CheckpointOptions.forFullCheckpoint()));
    +
    +           backend.dispose();
    +
    +           env.getExecutionConfig().registerKryoType(TestPojo.class);
    +           
    +           backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
    +
    +           snapshot.discardState();
    +
    +           state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
    +           backend.setCurrentKey(1);
    +           assertEquals(state.value(), new TestPojo("u1", 1));
    +
    +           backend.setCurrentKey(2);
    +           assertEquals(state.value(), new TestPojo("u2", 2));
    +
    +           backend.dispose();
    +   }
    +
    +   /**
    +    * Verify that we can restore a snapshot that was done with without 
registered default
    --- End diff --
    
    "with without" redundancy --> "without", I think ;-)?



> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-6018
>                 URL: https://issues.apache.org/jira/browse/FLINK-6018
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>             Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:        stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>               return serializer != null;
>       }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>               if (serializer == null) { 
>                       if (typeInfo != null) {
>                               serializer = 
> typeInfo.createSerializer(executionConfig);
>                       } else {
>                               throw new IllegalStateException(
>                                               "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>                       }
>               }
>       }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>                       throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to