[
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827896#comment-15827896
]
ASF GitHub Bot commented on FLINK-5530:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3143#discussion_r96612430
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -242,6 +245,132 @@ public void testValueState() throws Exception {
backend.dispose();
}
+ /**
+ * Tests {@link ValueState#value()} and {@link
KvState#getSerializedValue(byte[])}
+ * accessing the state concurrently. They should not get in the way of
each
+ * other.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testValueStateRace() throws Exception {
+ final AbstractKeyedStateBackend<Integer> backend =
+ createKeyedBackend(IntSerializer.INSTANCE);
+ final Integer namespace = Integer.valueOf(1);
+
+ final ValueStateDescriptor<String> kvId =
+ new ValueStateDescriptor<>("id", String.class);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final TypeSerializer<Integer> keySerializer =
IntSerializer.INSTANCE;
+ final TypeSerializer<Integer> namespaceSerializer =
+ IntSerializer.INSTANCE;
+ final TypeSerializer<String> valueSerializer =
kvId.getSerializer();
+
+ final ValueState<String> state = backend
+ .getPartitionedState(namespace, IntSerializer.INSTANCE,
kvId);
+
+ @SuppressWarnings("unchecked")
+ final KvState<Integer> kvState = (KvState<Integer>) state;
+
+ /**
+ * 1) Test that ValueState#value() before and after
+ * KvState#getSerializedValue(byte[]) return the same value.
+ */
+
+ // set some key and namespace
+ final int key1 = 1;
+ backend.setCurrentKey(key1);
+ kvState.setCurrentNamespace(2);
+ state.update("2");
+ assertEquals("2", state.value());
+
+ // query another key and namespace
+ assertNull(getSerializedValue(kvState, 3, keySerializer,
+ namespace, IntSerializer.INSTANCE,
+ valueSerializer));
+
+ // the state should not have changed!
+ assertEquals("2", state.value());
+
+ // re-set values
+ kvState.setCurrentNamespace(namespace);
+
+ /**
+ * 2) Test two threads concurrently using ValueState#value() and
+ * KvState#getSerializedValue(byte[]).
+ */
+
+ // some modifications to the state
+ final int key2 = 10;
+ backend.setCurrentKey(key2);
+ assertNull(state.value());
+ assertNull(getSerializedValue(kvState, key2, keySerializer,
+ namespace, namespaceSerializer, valueSerializer));
+ state.update("1");
+
+ boolean getterSuccess;
+ final Throwable[] throwables = {null, null};
+
+ final Thread getter = new Thread("State getter") {
--- End diff --
How about using the `CheckedThread` to avoid the stuff with Throwable
arrays, etc
> race condition in AbstractRocksDBState#getSerializedValue
> ---------------------------------------------------------
>
> Key: FLINK-5530
> URL: https://issues.apache.org/jira/browse/FLINK-5530
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.2.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Blocker
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation
> stream as the ordinary state access methods but is called in parallel during
> state queries thus violating the assumption of only one thread accessing it.
> This may lead to either wrong results in queries or corrupt data while
> queries are executed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)