Repository: flink
Updated Branches:
  refs/heads/release-1.2 289932cdb -> d8222c117


[FLINK-5530] [queryable state] Fix race condition in 
AbstractRocksDBState#getSerializedValue

AbstractRocksDBState#getSerializedValue() uses the same key serialisation
stream as the ordinary state access methods but is called in parallel during
state queries thus violating the assumption of only one thread accessing it.

This may lead to either wrong results in queries or corrupt data while queries
are executed.

This closes #3143.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8222c11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8222c11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8222c11

Branch: refs/heads/release-1.2
Commit: d8222c1177eab32411e88bbf07c2ad2b9fcfc765
Parents: 289932c
Author: Nico Kruber <[email protected]>
Authored: Tue Jan 17 17:38:29 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Sun Jan 22 11:45:21 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  70 ++++++++---
 .../runtime/state/StateBackendTestBase.java     | 117 +++++++++++++++++++
 .../flink/core/testutils/CheckedThread.java     |  19 +++
 3 files changed, 187 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8222c11/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 9da33ef..6785f17 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -73,7 +73,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
        private final WriteOptions writeOptions;
 
        protected final ByteArrayOutputStreamWithPos keySerializationStream;
-       protected final DataOutputView keySerializationDateDataOutputView;
+       protected final DataOutputView keySerializationDataOutputView;
 
        private final boolean ambiguousKeyPossible;
 
@@ -97,7 +97,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                this.stateDesc = Preconditions.checkNotNull(stateDesc, "State 
Descriptor");
 
                this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-               this.keySerializationDateDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
+               this.keySerializationDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
                this.ambiguousKeyPossible = 
(backend.getKeySerializer().getLength() < 0)
                                && (namespaceSerializer.getLength() < 0);
        }
@@ -132,55 +132,87 @@ public abstract class AbstractRocksDBState<K, N, S 
extends State, SD extends Sta
                                namespaceSerializer);
 
                int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-               return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
 
+               // we cannot reuse the keySerializationStream member since this 
method
+               // is called concurrently to the other ones and it may thus 
contain garbage
+               ByteArrayOutputStreamWithPos tmpKeySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
+               DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView = new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+                       tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+               return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
        }
 
        protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-               
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
+               writeKeyWithGroupAndNamespace(
+                       backend.getCurrentKeyGroupIndex(),
+                       backend.getCurrentKey(),
+                       currentNamespace,
+                       keySerializationStream,
+                       keySerializationDataOutputView);
        }
 
-       protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
+       protected void writeKeyWithGroupAndNamespace(
+                       int keyGroup, K key, N namespace,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
+
                keySerializationStream.reset();
-               writeKeyGroup(keyGroup);
-               writeKey(key);
-               writeNameSpace(namespace);
+               writeKeyGroup(keyGroup, keySerializationDataOutputView);
+               writeKey(key, keySerializationStream, 
keySerializationDataOutputView);
+               writeNameSpace(namespace, keySerializationStream, 
keySerializationDataOutputView);
        }
 
-       private void writeKeyGroup(int keyGroup) throws IOException {
+       private void writeKeyGroup(
+                       int keyGroup,
+                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
                for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
                        keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
                }
        }
 
-       private void writeKey(K key) throws IOException {
+       private void writeKey(
+                       K key,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
                //write key
                int beforeWrite = keySerializationStream.getPosition();
-               backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
+               backend.getKeySerializer().serialize(key, 
keySerializationDataOutputView);
 
                if (ambiguousKeyPossible) {
                        //write size of key
-                       writeLengthFrom(beforeWrite);
+                       writeLengthFrom(beforeWrite, keySerializationStream,
+                               keySerializationDataOutputView);
                }
        }
 
-       private void writeNameSpace(N namespace) throws IOException {
+       private void writeNameSpace(
+                       N namespace,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
                int beforeWrite = keySerializationStream.getPosition();
-               namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
+               namespaceSerializer.serialize(namespace, 
keySerializationDataOutputView);
 
                if (ambiguousKeyPossible) {
                        //write length of namespace
-                       writeLengthFrom(beforeWrite);
+                       writeLengthFrom(beforeWrite, keySerializationStream,
+                               keySerializationDataOutputView);
                }
        }
 
-       private void writeLengthFrom(int fromPosition) throws IOException {
+       private static void writeLengthFrom(
+                       int fromPosition,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
                int length = keySerializationStream.getPosition() - 
fromPosition;
-               writeVariableIntBytes(length);
+               writeVariableIntBytes(length, 
keySerializationDateDataOutputView);
        }
 
-       private void writeVariableIntBytes(int value) throws IOException {
+       private static void writeVariableIntBytes(
+                       int value,
+                       DataOutputView keySerializationDateDataOutputView)
+                       throws IOException {
                do {
                        keySerializationDateDataOutputView.writeByte(value);
                        value >>>= 8;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8222c11/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 641e14b..38e04aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -54,6 +55,8 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RunnableFuture;
 
@@ -242,6 +245,120 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
        }
 
+       /**
+        * Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
+        * accessing the state concurrently. They should not get in the way of 
each
+        * other.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testValueStateRace() throws Exception {
+               final AbstractKeyedStateBackend<Integer> backend =
+                       createKeyedBackend(IntSerializer.INSTANCE);
+               final Integer namespace = Integer.valueOf(1);
+
+               final ValueStateDescriptor<String> kvId =
+                       new ValueStateDescriptor<>("id", String.class);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
+               final TypeSerializer<Integer> namespaceSerializer =
+                       IntSerializer.INSTANCE;
+               final TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
+
+               final ValueState<String> state = backend
+                       .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
+
+               @SuppressWarnings("unchecked")
+               final KvState<Integer> kvState = (KvState<Integer>) state;
+
+               /**
+                * 1) Test that ValueState#value() before and after
+                * KvState#getSerializedValue(byte[]) return the same value.
+                */
+
+               // set some key and namespace
+               final int key1 = 1;
+               backend.setCurrentKey(key1);
+               kvState.setCurrentNamespace(2);
+               state.update("2");
+               assertEquals("2", state.value());
+
+               // query another key and namespace
+               assertNull(getSerializedValue(kvState, 3, keySerializer,
+                       namespace, IntSerializer.INSTANCE,
+                       valueSerializer));
+
+               // the state should not have changed!
+               assertEquals("2", state.value());
+
+               // re-set values
+               kvState.setCurrentNamespace(namespace);
+
+               /**
+                * 2) Test two threads concurrently using ValueState#value() and
+                * KvState#getSerializedValue(byte[]).
+                */
+
+               // some modifications to the state
+               final int key2 = 10;
+               backend.setCurrentKey(key2);
+               assertNull(state.value());
+               assertNull(getSerializedValue(kvState, key2, keySerializer,
+                       namespace, namespaceSerializer, valueSerializer));
+               state.update("1");
+
+               final CheckedThread getter = new CheckedThread("State getter") {
+                       @Override
+                       public void go() throws Exception {
+                               while (!isInterrupted()) {
+                                       assertEquals("1", state.value());
+                               }
+                       }
+               };
+
+               final CheckedThread serializedGetter = new 
CheckedThread("Serialized state getter") {
+                       @Override
+                       public void go() throws Exception {
+                               while(!isInterrupted() && getter.isAlive()) {
+                                       final String serializedValue =
+                                               getSerializedValue(kvState, 
key2, keySerializer,
+                                                       namespace, 
namespaceSerializer,
+                                                       valueSerializer);
+                                       assertEquals("1", serializedValue);
+                               }
+                       }
+               };
+
+               getter.start();
+               serializedGetter.start();
+
+               // run both threads for max 100ms
+               Timer t = new Timer("stopper");
+               t.schedule(new TimerTask() {
+                       @Override
+                       public void run() {
+                               getter.interrupt();
+                               serializedGetter.interrupt();
+                               this.cancel();
+                       }
+               }, 100);
+
+               // wait for both threads to finish
+               try {
+                       // serializedGetter will finish if its assertion fails 
or if
+                       // getter is not alive any more
+                       serializedGetter.sync();
+                       // if serializedGetter crashed, getter will not know -> 
interrupt just in case
+                       getter.interrupt();
+                       getter.sync();
+                       t.cancel(); // if not executed yet
+               } finally {
+                       // clean up
+                       backend.dispose();
+               }
+       }
+
        @Test
        @SuppressWarnings("unchecked")
        public void testMultipleValueStates() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8222c11/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index 1dad8c8..5de6a87 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -37,6 +37,25 @@ public abstract class CheckedThread extends Thread {
        // 
------------------------------------------------------------------------
 
        /**
+        * Unnamed checked thread.
+        */
+       public CheckedThread() {
+               super();
+       }
+
+       /**
+        * Checked thread with a name.
+        *
+        * @param name
+        *              the name of the new thread
+        *
+        * @see Thread#Thread(String)
+        */
+       public CheckedThread(final String name) {
+               super(name);
+       }
+
+       /**
         * This method needs to be overwritten to contain the main work logic.
         * It takes the role of {@link Thread#run()}, but should propagate 
exceptions.
         *

Reply via email to