[
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sihua Zhou reassigned FLINK-9269:
---------------------------------
Assignee: Sihua Zhou
> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -----------------------------------------------------------------------------
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Blocker
> Fix For: 1.5.0
>
>
> {code:java}
> @Nonnull
> @Override
> protected
> SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
> long startTime =
> System.currentTimeMillis();
>
> CheckpointStreamFactory.CheckpointStateOutputStream localStream =
>
> this.streamAndResultExtractor.getCheckpointOutputStream();
> DataOutputViewStreamWrapper
> outView = new DataOutputViewStreamWrapper(localStream);
>
> serializationProxy.write(outView);
> long[] keyGroupRangeOffsets =
> new long[keyGroupRange.getNumberOfKeyGroups()];
> for (int keyGroupPos = 0;
> keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
> int keyGroupId =
> keyGroupRange.getKeyGroupId(keyGroupPos);
>
> keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
>
> outView.writeInt(keyGroupId);
> for (Map.Entry<String,
> StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
> try
> (OutputStream kgCompressionOut =
> keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
>
> DataOutputViewStreamWrapper kgCompressionView = new
> DataOutputViewStreamWrapper(kgCompressionOut);
>
> kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
>
> cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
> keyGroupId);
> } // this will
> just close the outer compression stream
> }
> }
> if
> (cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) {
> KeyGroupRangeOffsets
> kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
>
> SnapshotResult<StreamStateHandle> result =
>
> streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
>
> streamAndResultExtractor = null;
>
> logOperationCompleted(primaryStreamFactory, startTime);
> return
> CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result,
> kgOffs);
> }
> return SnapshotResult.empty();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)