Repository: flink
Updated Branches:
  refs/heads/master de10b4009 -> 14e7d35f2


[FLINK-9269][state] Fix concurrency problem when performing checkpoint in 
HeapKeyedStateBackend.

This closes #5934.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14e7d35f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14e7d35f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14e7d35f

Branch: refs/heads/master
Commit: 14e7d35f26f51672cb90b63f16f349e6eb846b23
Parents: de10b40
Author: sihuazhou <[email protected]>
Authored: Sat Apr 28 10:50:41 2018 +0800
Committer: Stefan Richter <[email protected]>
Committed: Thu May 3 09:26:28 2018 +0200

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       | 19 +++---
 .../runtime/state/StateBackendTestBase.java     | 69 ++++++++++++++++++--
 .../state/RocksDBKeyedStateBackend.java         |  2 +
 3 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 568ab3d..d0fb8a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -68,7 +68,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -615,20 +614,23 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
 
-                       final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots =
-                               new HashedMap(stateTables.size());
+                       final Map<String, StateTableSnapshot> 
cowStateStableSnapshots =
+                               new HashMap<>(stateTables.size());
 
                        for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
-                               kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+                               String stateName = kvState.getKey();
+                               kVStateToId.put(stateName, kVStateToId.size());
                                StateTable<K, ?, ?> stateTable = 
kvState.getValue();
                                if (null != stateTable) {
                                        
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
-                                       cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+                                       cowStateStableSnapshots.put(stateName, 
stateTable.createSnapshot());
                                }
                        }
 
                        final KeyedBackendSerializationProxy<K> 
serializationProxy =
                                new KeyedBackendSerializationProxy<>(
+                                       // TODO: this code assumes that writing 
a serializer is threadsafe, we should support to
+                                       // get a serialized form already at 
state registration time in the future
                                        keySerializer,
                                        metaInfoSnapshots,
                                        
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
@@ -703,11 +705,12 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                        
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                                                        
outView.writeInt(keyGroupId);
 
-                                                       for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+                                                       for (Map.Entry<String, 
StateTableSnapshot> kvState : cowStateStableSnapshots.entrySet()) {
                                                                try 
(OutputStream kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
+                                                                       String 
stateName = kvState.getKey();
                                                                        
DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);
-                                                                       
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
-                                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
+                                                                       
kgCompressionView.writeShort(kVStateToId.get(stateName));
+                                                                       
kvState.getValue().writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
                                                                } // this will 
just close the outer compression stream
                                                        }
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5737964..0c843db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -78,11 +74,17 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -101,9 +103,14 @@ import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
@@ -3723,6 +3730,58 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
        }
 
+       @Test
+       public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() 
throws Exception {
+
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment();
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               ExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+               try {
+                       long checkpointID = 0;
+                       List<Future> futureList = new ArrayList();
+                       for (int i = 0; i < 10; ++i) {
+                               ValueStateDescriptor<Integer> kvId = new 
ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
+                               ValueState<Integer> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+                               ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+                               backend.setCurrentKey(i);
+                               state.update(i);
+
+                               futureList.add(runSnapshotAsync(executorService,
+                                       backend.snapshot(checkpointID++, 
System.currentTimeMillis(), streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation())));
+                       }
+
+                       for (Future future : futureList) {
+                               future.get(20, TimeUnit.SECONDS);
+                       }
+               } catch (Exception e) {
+                       fail();
+               } finally {
+                       backend.dispose();
+                       executorService.shutdown();
+               }
+       }
+
+       protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
+               ExecutorService executorService,
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotRunnableFuture) throws Exception {
+
+               if (!snapshotRunnableFuture.isDone()) {
+                       CompletableFuture<SnapshotResult<KeyedStateHandle>> 
completableFuture = new CompletableFuture<>();
+                       executorService.submit(() -> {
+                               try {
+                                       snapshotRunnableFuture.run();
+                                       
completableFuture.complete(snapshotRunnableFuture.get());
+                               } catch (Exception e) {
+                                       
completableFuture.completeExceptionally(e);
+                               }
+                       });
+                       return completableFuture;
+               }
+               return 
CompletableFuture.completedFuture(snapshotRunnableFuture.get());
+       }
+
        /**
         * Returns the value by getting the serialized value and deserializing 
it
         * if it is not null.

http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 2990212..469247c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -2009,6 +2009,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        KeyedBackendSerializationProxy<K> serializationProxy =
                                new KeyedBackendSerializationProxy<>(
+                                       // TODO: this code assumes that writing 
a serializer is threadsafe, we should support to
+                                       // get a serialized form already at 
state registration time in the future
                                        stateBackend.getKeySerializer(),
                                        stateMetaInfoSnapshots,
                                        !Objects.equals(

Reply via email to