Repository: flink Updated Branches: refs/heads/master de10b4009 -> 14e7d35f2
[FLINK-9269][state] Fix concurrency problem when performing checkpoint in HeapKeyedStateBackend. This closes #5934. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14e7d35f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14e7d35f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14e7d35f Branch: refs/heads/master Commit: 14e7d35f26f51672cb90b63f16f349e6eb846b23 Parents: de10b40 Author: sihuazhou <[email protected]> Authored: Sat Apr 28 10:50:41 2018 +0800 Committer: Stefan Richter <[email protected]> Committed: Thu May 3 09:26:28 2018 +0200 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 19 +++--- .../runtime/state/StateBackendTestBase.java | 69 ++++++++++++++++++-- .../state/RocksDBKeyedStateBackend.java | 2 + 3 files changed, 77 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 568ab3d..d0fb8a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -68,7 +68,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.function.SupplierWithException; -import org.apache.commons.collections.map.HashedMap; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -615,20 +614,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size()); - final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = - new HashedMap(stateTables.size()); + final Map<String, StateTableSnapshot> cowStateStableSnapshots = + new HashMap<>(stateTables.size()); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - kVStateToId.put(kvState.getKey(), kVStateToId.size()); + String stateName = kvState.getKey(); + kVStateToId.put(stateName, kVStateToId.size()); StateTable<K, ?, ?> stateTable = kvState.getValue(); if (null != stateTable) { metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot()); - cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); + cowStateStableSnapshots.put(stateName, stateTable.createSnapshot()); } } final KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>( + // TODO: this code assumes that writing a serializer is threadsafe, we should support to + // get a serialized form already at state registration time in the future keySerializer, metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); @@ -703,11 +705,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); outView.writeInt(keyGroupId); - for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { + for (Map.Entry<String, StateTableSnapshot> kvState : cowStateStableSnapshots.entrySet()) { try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) { + String stateName = kvState.getKey(); DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); - kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); - cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); + kgCompressionView.writeShort(kVStateToId.get(stateName)); + kvState.getValue().writeMappingsInKeyGroup(kgCompressionView, keyGroupId); } // this will just close the outer compression stream } } http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 5737964..0c843db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.state; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; @@ -78,11 +74,17 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; -import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; import org.apache.flink.types.IntValue; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -101,9 +103,14 @@ import java.util.PrimitiveIterator; import java.util.Random; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static java.util.Arrays.asList; @@ -3723,6 +3730,58 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } } + @Test + public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception { + + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment(); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + ExecutorService executorService = Executors.newScheduledThreadPool(1); + try { + long checkpointID = 0; + List<Future> futureList = new ArrayList(); + for (int i = 0; i < 10; ++i) { + ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); + ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); + backend.setCurrentKey(i); + state.update(i); + + futureList.add(runSnapshotAsync(executorService, + backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()))); + } + + for (Future future : futureList) { + future.get(20, TimeUnit.SECONDS); + } + } catch (Exception e) { + fail(); + } finally { + backend.dispose(); + executorService.shutdown(); + } + } + + protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync( + ExecutorService executorService, + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception { + + if (!snapshotRunnableFuture.isDone()) { + CompletableFuture<SnapshotResult<KeyedStateHandle>> completableFuture = new CompletableFuture<>(); + executorService.submit(() -> { + try { + snapshotRunnableFuture.run(); + completableFuture.complete(snapshotRunnableFuture.get()); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); + return completableFuture; + } + return CompletableFuture.completedFuture(snapshotRunnableFuture.get()); + } + /** * Returns the value by getting the serialized value and deserializing it * if it is not null. http://git-wip-us.apache.org/repos/asf/flink/blob/14e7d35f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 2990212..469247c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -2009,6 +2009,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>( + // TODO: this code assumes that writing a serializer is threadsafe, we should support to + // get a serialized form already at state registration time in the future stateBackend.getKeySerializer(), stateMetaInfoSnapshots, !Objects.equals(
