Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3143#discussion_r96612430
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
    @@ -242,6 +245,132 @@ public void testValueState() throws Exception {
                backend.dispose();
        }
     
    +   /**
    +    * Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
    +    * accessing the state concurrently. They should not get in the way of 
each
    +    * other.
    +    */
    +   @Test
    +   @SuppressWarnings("unchecked")
    +   public void testValueStateRace() throws Exception {
    +           final AbstractKeyedStateBackend<Integer> backend =
    +                   createKeyedBackend(IntSerializer.INSTANCE);
    +           final Integer namespace = Integer.valueOf(1);
    +
    +           final ValueStateDescriptor<String> kvId =
    +                   new ValueStateDescriptor<>("id", String.class);
    +           kvId.initializeSerializerUnlessSet(new ExecutionConfig());
    +
    +           final TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
    +           final TypeSerializer<Integer> namespaceSerializer =
    +                   IntSerializer.INSTANCE;
    +           final TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
    +
    +           final ValueState<String> state = backend
    +                   .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
    +
    +           @SuppressWarnings("unchecked")
    +           final KvState<Integer> kvState = (KvState<Integer>) state;
    +
    +           /**
    +            * 1) Test that ValueState#value() before and after
    +            * KvState#getSerializedValue(byte[]) return the same value.
    +            */
    +
    +           // set some key and namespace
    +           final int key1 = 1;
    +           backend.setCurrentKey(key1);
    +           kvState.setCurrentNamespace(2);
    +           state.update("2");
    +           assertEquals("2", state.value());
    +
    +           // query another key and namespace
    +           assertNull(getSerializedValue(kvState, 3, keySerializer,
    +                   namespace, IntSerializer.INSTANCE,
    +                   valueSerializer));
    +
    +           // the state should not have changed!
    +           assertEquals("2", state.value());
    +
    +           // re-set values
    +           kvState.setCurrentNamespace(namespace);
    +
    +           /**
    +            * 2) Test two threads concurrently using ValueState#value() and
    +            * KvState#getSerializedValue(byte[]).
    +            */
    +
    +           // some modifications to the state
    +           final int key2 = 10;
    +           backend.setCurrentKey(key2);
    +           assertNull(state.value());
    +           assertNull(getSerializedValue(kvState, key2, keySerializer,
    +                   namespace, namespaceSerializer, valueSerializer));
    +           state.update("1");
    +
    +           boolean getterSuccess;
    +           final Throwable[] throwables = {null, null};
    +
    +           final Thread getter = new Thread("State getter") {
    --- End diff --
    
    How about using the `CheckedThread` to avoid the stuff with Throwable 
arrays, etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to