Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165344648
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
---
@@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) {
return sum;
}
- public <N, V> StateTable<K, N, V>
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
- return asynchronousSnapshots ?
- new CopyOnWriteStateTable<>(this, newMetaInfo) :
- new NestedMapsStateTable<>(this, newMetaInfo);
- }
-
@Override
public boolean supportsAsynchronousSnapshots() {
- return asynchronousSnapshots;
+ return snapshotStrategy.isAsynchronous();
}
@VisibleForTesting
public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() {
return localRecoveryConfig;
}
+
+ /**
+ * Base class for the snapshots of the heap backend that outlines the
algorithm and offers some hooks to realize
+ * the concrete strategies.
+ */
+ private abstract class HeapSnapshotStrategy implements
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
+
+ @Override
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>>
performSnapshot(
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
+
+ if (!hasRegisteredState()) {
+ return DoneFuture.nullValue();
+ }
+
+ long syncStartTime = System.currentTimeMillis();
+
+ Preconditions.checkState(stateTables.size() <=
Short.MAX_VALUE,
+ "Too many KV-States: " + stateTables.size() +
+ ". Currently at most " +
Short.MAX_VALUE + " states are supported");
+
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> metaInfoSnapshots =
+ new ArrayList<>(stateTables.size());
+
+ final Map<String, Integer> kVStateToId = new
HashMap<>(stateTables.size());
+
+ final Map<StateTable<K, ?, ?>, StateTableSnapshot>
cowStateStableSnapshots =
+ new HashedMap(stateTables.size());
+
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState :
stateTables.entrySet()) {
+ kVStateToId.put(kvState.getKey(),
kVStateToId.size());
+ StateTable<K, ?, ?> stateTable =
kvState.getValue();
+ if (null != stateTable) {
+
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
+ cowStateStableSnapshots.put(stateTable,
stateTable.createSnapshot());
+ }
+ }
+
+ final KeyedBackendSerializationProxy<K>
serializationProxy =
+ new KeyedBackendSerializationProxy<>(
+ keySerializer,
+ metaInfoSnapshots,
+
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE,
keyGroupCompressionDecorator));
+
+ //---------------------------------------------------
this becomes the end of sync part
+
+ // implementation of the async IO operation, based on
FutureTask
+ final
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable
=
+ new
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
+
+
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+ @Override
+ protected void acquireResources()
throws Exception {
+ stream =
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
cancelStreamRegistry.registerCloseable(stream);
+ }
+
+ @Override
+ protected void releaseResources()
throws Exception {
+
+ if
(cancelStreamRegistry.unregisterCloseable(stream)) {
+
IOUtils.closeQuietly(stream);
+ stream = null;
+ }
+
+ for (StateTableSnapshot
tableSnapshot : cowStateStableSnapshots.values()) {
+ tableSnapshot.release();
+ }
+ }
+
+ @Override
+ protected void stopOperation() throws
Exception {
+ if
(cancelStreamRegistry.unregisterCloseable(stream)) {
+
IOUtils.closeQuietly(stream);
+ stream = null;
+ }
+ }
+
+ @Override
+ protected
SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
+
+ long startTime =
System.currentTimeMillis();
+
+
CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
+
+ DataOutputViewStreamWrapper
outView = new DataOutputViewStreamWrapper(localStream);
+
serializationProxy.write(outView);
+
+ long[] keyGroupRangeOffsets =
new long[keyGroupRange.getNumberOfKeyGroups()];
+
+ for (int keyGroupPos = 0;
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+ int keyGroupId =
keyGroupRange.getKeyGroupId(keyGroupPos);
+
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
+
outView.writeInt(keyGroupId);
+
+ for (Map.Entry<String,
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ OutputStream
kgCompressionOut =
keyGroupCompressionDecorator.decorateWithCompression(localStream);
--- End diff --
Can we put this in a try with resource statement?
---