Sihua Zhou created FLINK-9269: --------------------------------- Summary: Concurrency problem in HeapKeyedStateBackend when performing checkpoint async Key: FLINK-9269 URL: https://issues.apache.org/jira/browse/FLINK-9269 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Sihua Zhou Fix For: 1.5.0
{code:java} @Nonnull @Override protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception { long startTime = System.currentTimeMillis(); CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.streamAndResultExtractor.getCheckpointOutputStream(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream); serializationProxy.write(outView); long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); outView.writeInt(keyGroupId); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) { DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); } // this will just close the outer compression stream } } if (cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) { KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); SnapshotResult<StreamStateHandle> result = streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult(); streamAndResultExtractor = null; logOperationCompleted(primaryStreamFactory, startTime); return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs); } return SnapshotResult.empty(); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)