[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9 Branch: refs/heads/master Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b Parents: 30bb958 Author: Stefan Richter <[email protected]> Authored: Fri Mar 3 10:51:15 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Thu Mar 16 18:34:02 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 5 + .../state/RocksDBStateBackendTest.java | 86 +- .../java/org/apache/flink/util/MathUtils.java | 35 +- .../filesystem/AbstractFsStateSnapshot.java | 30 +- .../state/memory/AbstractMemStateSnapshot.java | 40 +- .../AbstractMigrationRestoreStrategy.java | 117 ++ .../state/memory/MigrationRestoreSnapshot.java | 32 + .../state/AbstractKeyedStateBackend.java | 7 + .../runtime/state/ArrayListSerializer.java | 14 +- .../state/KeyedBackendSerializationProxy.java | 23 +- .../flink/runtime/state/KeyedStateBackend.java | 28 +- .../state/StateTransformationFunction.java | 23 + .../state/filesystem/FsStateBackend.java | 109 +- .../state/heap/AbstractHeapMergingState.java | 93 +- .../runtime/state/heap/AbstractHeapState.java | 84 +- .../state/heap/AbstractStateTableSnapshot.java | 51 + .../state/heap/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++ .../heap/CopyOnWriteStateTableSnapshot.java | 188 +++ .../state/heap/HeapAggregatingState.java | 92 +- .../runtime/state/heap/HeapFoldingState.java | 70 +- .../state/heap/HeapKeyedStateBackend.java | 426 +++---- .../flink/runtime/state/heap/HeapListState.java | 63 +- .../flink/runtime/state/heap/HeapMapState.java | 167 +-- .../runtime/state/heap/HeapReducingState.java | 82 +- .../runtime/state/heap/HeapValueState.java | 47 +- .../runtime/state/heap/InternalKeyContext.java | 60 + .../state/heap/NestedMapsStateTable.java | 363 ++++++ .../flink/runtime/state/heap/StateEntry.java | 44 + .../flink/runtime/state/heap/StateTable.java | 222 ++-- .../state/heap/StateTableByKeyGroupReader.java | 38 + .../state/heap/StateTableByKeyGroupReaders.java | 136 +++ .../runtime/state/heap/StateTableSnapshot.java | 45 + .../state/memory/MemoryStateBackend.java | 26 + .../runtime/query/QueryableStateClientTest.java | 7 +- .../message/KvStateRequestSerializerTest.java | 38 +- .../state/AsyncFileStateBackendTest.java | 27 + .../state/AsyncMemoryStateBackendTest.java | 27 + .../runtime/state/FileStateBackendTest.java | 8 +- .../runtime/state/MemoryStateBackendTest.java | 8 +- .../runtime/state/StateBackendTestBase.java | 216 +++- .../state/heap/CopyOnWriteStateTableTest.java | 486 ++++++++ .../state/heap/HeapAggregatingStateTest.java | 21 +- ...pKeyedStateBackendSnapshotMigrationTest.java | 173 +++ .../runtime/state/heap/HeapListStateTest.java | 17 +- .../state/heap/HeapReducingStateTest.java | 23 +- .../state/heap/HeapStateBackendTestBase.java | 54 + .../StateTableSnapshotCompatibilityTest.java | 118 ++ .../util/BlockerCheckpointStreamFactory.java | 118 ++ .../heap_keyed_statebackend_1_2.snapshot | Bin 0 -> 2068 bytes .../api/windowing/windows/TimeWindow.java | 5 +- ...tractEventTimeWindowCheckpointingITCase.java | 64 +- ...ckendEventTimeWindowCheckpointingITCase.java | 26 + ...ckendEventTimeWindowCheckpointingITCase.java | 26 + ...ckendEventTimeWindowCheckpointingITCase.java | 20 + .../test/state/ManualWindowSpeedITCase.java | 7 +- 55 files changed, 4239 insertions(+), 1162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index aaccc2f..f585d21 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // expected } } + + @Override + public boolean supportsAsynchronousSnapshots() { + return true; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index c7b5c20..708613b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa assertEquals(null, keyedStateBackend.db); } - static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { - - private final int maxSize; - private int afterNumberInvocations; - private OneShotLatch blocker; - private OneShotLatch waiter; - - MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; - - public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { - return lastCreatedStream; - } - - public BlockerCheckpointStreamFactory(int maxSize) { - this.maxSize = maxSize; - } - - public void setAfterNumberInvocations(int afterNumberInvocations) { - this.afterNumberInvocations = afterNumberInvocations; - } - - public void setBlockerLatch(OneShotLatch latch) { - this.blocker = latch; - } - - public void setWaiterLatch(OneShotLatch latch) { - this.waiter = latch; - } - - @Override - public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { - waiter.trigger(); - this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { - - private int afterNInvocations = afterNumberInvocations; - private final OneShotLatch streamBlocker = blocker; - private final OneShotLatch streamWaiter = waiter; - - @Override - public void write(int b) throws IOException { - - if (afterNInvocations > 0) { - --afterNInvocations; - } - - if (0 == afterNInvocations && null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } - try { - super.write(b); - } catch (IOException ex) { - if (null != streamWaiter) { - streamWaiter.trigger(); - } - throw ex; - } - - if (0 == afterNInvocations && null != streamWaiter) { - streamWaiter.trigger(); - } - } - - @Override - public void close() { - super.close(); - if (null != streamWaiter) { - streamWaiter.trigger(); - } - } - }; - - return lastCreatedStream; - } - - @Override - public void close() throws Exception { - - } - } - private static class AcceptAllFilter implements IOFileFilter { @Override public boolean accept(File file) { http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index 074e8ae..1d84a39 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -140,11 +140,7 @@ public final class MathUtils { code = code * 5 + 0xe6546b64; code ^= 4; - code ^= code >>> 16; - code *= 0x85ebca6b; - code ^= code >>> 13; - code *= 0xc2b2ae35; - code ^= code >>> 16; + code = bitMix(code); if (code >= 0) { return code; @@ -172,6 +168,35 @@ public final class MathUtils { return x + 1; } + /** + * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution. + * + * @param in the long (64-bit)input. + * @return the bit-mixed int (32-bit) output + */ + public static int longToIntWithBitMixing(long in) { + in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L; + in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL; + in = in ^ (in >>> 31); + return (int) in; + } + + /** + * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is + * from Murmur's 32 bit finalizer. + * + * @param in the input value + * @return the bit-mixed output value + */ + public static int bitMix(int in) { + in ^= in >>> 16; + in *= 0x85ebca6b; + in ^= in >>> 13; + in *= 0xc2b2ae35; + in ^= in >>> 16; + return in; + } + // ============================================================================================ /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java index 103c214..a15e49d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java @@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.migration.runtime.state.KvStateSnapshot; +import org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy; +import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.StateTable; import java.io.IOException; @@ -36,7 +44,7 @@ import java.io.IOException; @Deprecated @SuppressWarnings("deprecation") public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> - extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> { + extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> { private static final long serialVersionUID = 1L; @@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte public SD getStateDesc() { return stateDesc; } + + @Override + @SuppressWarnings("unchecked") + public StateTable<K, N, SV> deserialize( + String stateName, + HeapKeyedStateBackend<K> stateBackend) throws IOException { + + final FileSystem fs = getFilePath().getFileSystem(); + try (FSDataInputStream inStream = fs.open(getFilePath())) { + final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream); + AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy = + new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) { + @Override + protected DataInputView openDataInputView() throws IOException { + return inView; + } + }; + return restoreStrategy.deserialize(stateName, stateBackend); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java index 6056578..ff86f7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java @@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.migration.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.runtime.util.DataInputDeserializer; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; @Deprecated @SuppressWarnings("deprecation") public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> - implements KvStateSnapshot<K, N, S, SD> { + implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> { private static final long serialVersionUID = 1L; @@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext this.data = data; } - public HashMap<N, Map<K, SV>> deserialize() throws IOException { - DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length); - - final int numKeys = inView.readInt(); - HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys); - - for (int i = 0; i < numKeys && !closed; i++) { - N namespace = namespaceSerializer.deserialize(inView); - final int numValues = inView.readInt(); - Map<K, SV> namespaceMap = new HashMap<>(numValues); - stateMap.put(namespace, namespaceMap); - for (int j = 0; j < numValues; j++) { - K key = keySerializer.deserialize(inView); - SV value = stateSerializer.deserialize(inView); - namespaceMap.put(key, value); - } - } - return stateMap; + @Override + @SuppressWarnings("unchecked") + public StateTable<K, N, SV> deserialize( + String stateName, + HeapKeyedStateBackend<K> stateBackend) throws IOException { + + final DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length); + AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy = + new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) { + @Override + protected DataInputView openDataInputView() throws IOException { + return inView; + } + }; + return restoreStrategy.deserialize(stateName, stateBackend); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java new file mode 100644 index 0000000..e572619 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.runtime.state.memory; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * This class outlines the general strategy to restore from migration states. + * + * @param <K> type of key. + * @param <N> type of namespace. + * @param <S> type of state. + */ +@Deprecated +public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements MigrationRestoreSnapshot<K, N, S> { + + /** + * Key Serializer + */ + protected final TypeSerializer<K> keySerializer; + + /** + * Namespace Serializer + */ + protected final TypeSerializer<N> namespaceSerializer; + + /** + * Serializer for the state value + */ + protected final TypeSerializer<S> stateSerializer; + + public AbstractMigrationRestoreStrategy( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<S> stateSerializer) { + + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); + this.stateSerializer = Preconditions.checkNotNull(stateSerializer); + } + + @Override + public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException { + + Preconditions.checkNotNull(stateName, "State name is null. Cannot deserialize snapshot."); + Preconditions.checkNotNull(stateBackend, "State backend is null. Cannot deserialize snapshot."); + + final KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange(); + Preconditions.checkState(1 == keyGroupRange.getNumberOfKeyGroups(), + "Unexpected number of key-groups for restoring from Flink 1.1"); + + TypeSerializer<N> patchedNamespaceSerializer = this.namespaceSerializer; + + if (patchedNamespaceSerializer instanceof VoidSerializer) { + patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE; + } + + RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + stateName, + patchedNamespaceSerializer, + stateSerializer); + + final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo); + final DataInputView inView = openDataInputView(); + final int keyGroup = keyGroupRange.getStartKeyGroup(); + final int numNamespaces = inView.readInt(); + + for (int i = 0; i < numNamespaces; i++) { + N namespace = namespaceSerializer.deserialize(inView); + if (null == namespace) { + namespace = (N) VoidNamespace.INSTANCE; + } + final int numKV = inView.readInt(); + for (int j = 0; j < numKV; j++) { + K key = keySerializer.deserialize(inView); + S value = stateSerializer.deserialize(inView); + stateTable.put(key, keyGroup, namespace, value); + } + } + return stateTable; + } + + /** + * Different state handles require different code to end up with a {@link DataInputView}. + */ + protected abstract DataInputView openDataInputView() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java new file mode 100644 index 0000000..ea529db --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.runtime.state.memory; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.util.Migration; + +import java.io.IOException; + +@Deprecated +@Internal +public interface MigrationRestoreSnapshot<K, N, S> extends Migration { + StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index aba00f3..1f2f4a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; @@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K> /** * @see KeyedStateBackend */ + @Override public KeyGroupRange getKeyGroupRange() { return keyGroupRange; } @@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K> public void close() throws IOException { cancelStreamRegistry.close(); } + + @VisibleForTesting + public boolean supportsAsynchronousSnapshots() { + return false; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index f5a6405..0badb41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { @Override public ArrayList<T> copy(ArrayList<T> from) { - ArrayList<T> newList = new ArrayList<>(from.size()); - for (int i = 0; i < from.size(); i++) { - newList.add(elementSerializer.copy(from.get(i))); + if (elementSerializer.isImmutableType()) { + // fast track using memcopy for immutable types + return new ArrayList<>(from); + } else { + // element-wise deep copy for mutable types + ArrayList<T> newList = new ArrayList<>(from.size()); + for (int i = 0; i < from.size(); i++) { + newList.add(elementSerializer.copy(from.get(i))); + } + return newList; } - return newList; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index dbee6cb..5661c38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.VersionMismatchException; import org.apache.flink.core.io.VersionedIOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -37,11 +38,12 @@ import java.util.List; */ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { - private static final int VERSION = 1; + public static final int VERSION = 2; private TypeSerializerSerializationProxy<?> keySerializerProxy; private List<StateMetaInfo<?, ?>> namedStateSerializationProxies; + private int restoredVersion; private ClassLoader userCodeClassLoader; public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) { @@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) { this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer)); this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); + this.restoredVersion = VERSION; Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); } @@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable return VERSION; } + public int getRestoredVersion() { + return restoredVersion; + } + + @Override + protected void resolveVersionRead(int foundVersion) throws VersionMismatchException { + super.resolveVersionRead(foundVersion); + this.restoredVersion = foundVersion; + } + + @Override + public boolean isCompatibleVersion(int version) { + // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) + return super.isCompatibleVersion(version) || version == 1; + } + @Override public void write(DataOutputView out) throws IOException { super.write(out); @@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable } } -//---------------------------------------------------------------------------------------------------------------------- + //---------------------------------------------------------------------------------------------------------------------- /** * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index 15e0491..09e27e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -21,13 +21,14 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.heap.InternalKeyContext; /** * A keyed state backend provides methods for managing keyed state. * * @param <K> The key by which state is keyed. */ -public interface KeyedStateBackend<K> { +public interface KeyedStateBackend<K> extends InternalKeyContext<K> { /** * Sets the current key that is used for partitioned state. @@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> { void setCurrentKey(K newKey); /** - * Used by states to access the current key. - */ - K getCurrentKey(); - - /** - * Returns the key-group to which the current key belongs. - */ - int getCurrentKeyGroupIndex(); - - /** - * Returns the number of key-groups aka max parallelism. - */ - int getNumberOfKeyGroups(); - - /** - * Returns the key groups for this backend. - */ - KeyGroupsList getKeyGroupRange(); - - /** - * {@link TypeSerializer} for the state backend key type. - */ - TypeSerializer<K> getKeySerializer(); - - /** * Creates or retrieves a keyed state backed by this state backend. * * @param namespaceSerializer The serializer used for the namespace type of the state http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java new file mode 100644 index 0000000..9e12ee5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +public interface StateTransformationFunction<S, T> { + S apply(S previousState, T value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 2e9198f..e27712c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend { /** State below this size will be stored as part of the metadata, rather than in files */ private final int fileStateThreshold; - + + /** Switch to chose between synchronous and asynchronous snapshots */ + private final boolean asynchronousSnapshots; + /** * Creates a new state backend that stores its checkpoint data in the file system and location * defined by the given URI. @@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend { * * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), * and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { + this(new Path(checkpointDataUri), asynchronousSnapshots); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(Path checkpointDataUri) throws IOException { @@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend { * * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), * and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException { + this(checkpointDataUri.toUri(), asynchronousSnapshots); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(URI checkpointDataUri) throws IOException { - this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD); + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException { + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots); } /** @@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend { * and the path to the checkpoint data directory. * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, * rather than in files - * + * * @throws IOException Thrown, if no file system can be found for the scheme in the URI. * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds. */ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { + + this(checkpointDataUri, fileStateSizeThreshold, false); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, + * rather than in files + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend( + URI checkpointDataUri, + int fileStateSizeThreshold, + boolean asynchronousSnapshots) throws IOException { + checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger."); - checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, + checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD); this.fileStateThreshold = fileStateSizeThreshold; this.basePath = validateAndNormalizeUri(checkpointDataUri); + + this.asynchronousSnapshots = asynchronousSnapshots; } /** @@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend { * Gets the threshold below which state is stored as part of the metadata, rather than in files. * This threshold ensures that the backend does not create a large amount of very small files, * where potentially the file pointers are larger than the state itself. - * + * * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}. - * + * * @return The file size threshold, in bytes. */ public int getMinFileSizeThreshold() { @@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend { env.getUserClassLoader(), numberOfKeyGroups, keyGroupRange, + asynchronousSnapshots, env.getExecutionConfig()); } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java index 4ac7125..3e76423 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java @@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.runtime.state.internal.InternalMergingState; import java.util.Collection; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkState; /** * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState}) * that is stored on the heap. - * + * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <SV> The type of the values in the state. @@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat implements InternalMergingState<N, IN, OUT> { /** + * The merge transformation function that implements the merge logic. + */ + private final MergeTransformation mergeTransformation; + + /** * Creates a new key/value state for the given hash map of key/value pairs. * - * @param backend The state backend backing that created this state. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ protected AbstractHeapMergingState( - KeyedStateBackend<K> backend, SD stateDesc, StateTable<K, N, SV> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) { - super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.mergeTransformation = new MergeTransformation(); } @Override @@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat return; // nothing to do } - final K key = backend.getCurrentKey(); - checkState(key != null, "No key set."); - - final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - - if (namespaceMap != null) { - SV merged = null; - - // merge the sources - for (N source : sources) { - Map<K, SV> keysForNamespace = namespaceMap.get(source); - if (keysForNamespace != null) { - // get and remove the next source per namespace/key - SV sourceState = keysForNamespace.remove(key); - - // if the namespace map became empty, remove - if (keysForNamespace.isEmpty()) { - namespaceMap.remove(source); - } - - if (merged != null && sourceState != null) { - merged = mergeState(merged, sourceState); - } - else if (merged == null) { - merged = sourceState; - } - } - } + final StateTable<K, N, SV> map = stateTable; + + SV merged = null; + + // merge the sources + for (N source : sources) { - // merge into the target, if needed - if (merged != null) { - Map<K, SV> keysForTarget = namespaceMap.get(target); - if (keysForTarget == null) { - keysForTarget = createNewMap(); - namespaceMap.put(target, keysForTarget); - } - SV targetState = keysForTarget.get(key); - - if (targetState != null) { - targetState = mergeState(targetState, merged); - } - else { - targetState = merged; - } - keysForTarget.put(key, targetState); + // get and remove the next source per namespace/key + SV sourceState = map.removeAndGetOld(source); + + if (merged != null && sourceState != null) { + merged = mergeState(merged, sourceState); + } else if (merged == null) { + merged = sourceState; } } - // else no entries for that key at all, nothing to do skip + // merge into the target, if needed + if (merged != null) { + map.transform(target, merged, mergeTransformation); + } } protected abstract SV mergeState(SV a, SV b) throws Exception; -} + + final class MergeTransformation implements StateTransformationFunction<SV, SV> { + + @Override + public SV apply(SV targetState, SV merged) throws Exception { + if (targetState != null) { + return mergeState(targetState, merged); + } else { + return merged; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 18b71de..7e1123d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.heap; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; @@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.util.Preconditions; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * Base class for partitioned {@link ListState} implementations that are backed by a regular * heap hash map. The concrete implementations define how the state is checkpointed. - * + * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <SV> The type of the values in the state. @@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St protected final SD stateDesc; /** The current namespace, which the access methods will refer to. */ - protected N currentNamespace = null; - - protected final KeyedStateBackend<K> backend; + protected N currentNamespace; protected final TypeSerializer<K> keySerializer; @@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St /** * Creates a new key/value state for the given hash map of key/value pairs. * - * @param backend The state backend backing that created this state. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ protected AbstractHeapState( - KeyedStateBackend<K> backend, SD stateDesc, StateTable<K, N, SV> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) { - Preconditions.checkNotNull(stateTable, "State table must not be null."); - - this.backend = backend; this.stateDesc = stateDesc; - this.stateTable = stateTable; + this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null."); this.keySerializer = keySerializer; this.namespaceSerializer = namespaceSerializer; + this.currentNamespace = null; } // ------------------------------------------------------------------------ @Override public final void clear() { - Preconditions.checkState(currentNamespace != null, "No namespace set."); - Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - - Map<N, Map<K, SV>> namespaceMap = - stateTable.get(backend.getCurrentKeyGroupIndex()); - - if (namespaceMap == null) { - return; - } - - Map<K, SV> keyedMap = namespaceMap.get(currentNamespace); - - if (keyedMap == null) { - return; - } - - SV removed = keyedMap.remove(backend.getCurrentKey()); - - if (removed == null) { - return; - } - - if (!keyedMap.isEmpty()) { - return; - } - - namespaceMap.remove(currentNamespace); + stateTable.remove(currentNamespace); } @Override @@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St Preconditions.checkState(namespace != null, "No namespace given."); Preconditions.checkState(key != null, "No key given."); - Map<N, Map<K, SV>> namespaceMap = - stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); - - if (namespaceMap == null) { - return null; - } - - Map<K, SV> keyedMap = namespaceMap.get(currentNamespace); - - if (keyedMap == null) { - return null; - } - - SV result = keyedMap.get(key); + SV result = stateTable.get(key, namespace); if (result == null) { return null; @@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St @SuppressWarnings("unchecked,rawtypes") TypeSerializer serializer = stateDesc.getSerializer(); - return KvStateRequestSerializer.serializeValue(result, serializer); } /** - * Creates a new map for use in Heap based state. - * - * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this - * will create a concurrent hash map instead of a regular one. - * - * @return A new namespace map. - */ - protected <MK, MV> Map<MK, MV> createNewMap() { - if (stateDesc.isQueryable()) { - return new ConcurrentHashMap<>(); - } else { - return new HashMap<>(); - } - } - - /** * This should only be used for testing. */ + @VisibleForTesting public StateTable<K, N, SV> getStateTable() { return stateTable; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java new file mode 100644 index 0000000..b0d7727 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +/** + * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how + * the snapshot is written during the serialization phase of checkpointing. + */ +@Internal +abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot { + + /** + * The {@link StateTable} from which this snapshot was created. + */ + final T owningStateTable; + + /** + * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table. + * + * @param owningStateTable the {@link StateTable} for which this object represents a snapshot. + */ + AbstractStateTableSnapshot(T owningStateTable) { + this.owningStateTable = Preconditions.checkNotNull(owningStateTable); + } + + /** + * Optional hook to release resources for this snapshot at the end of its lifecycle. + */ + @Override + public void release() { + } +} \ No newline at end of file
