[
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993344#comment-15993344
]
ASF GitHub Bot commented on FLINK-6364:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114360519
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws
InterruptedException {
}
}
+ private static class RocksDBIncrementalSnapshotOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ private final long checkpointId;
+
+ private final long checkpointTimestamp;
+
+ private Map<String, StreamStateHandle> baseSstFiles;
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> stateMetaInfos = new ArrayList<>();
+
+ private FileSystem backupFileSystem;
+ private Path backupPath;
+
+ private FSDataInputStream inputStream = null;
+ private CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
+ // new sst files since the last completed checkpoint
+ private Set<String> newSstFileNames = new HashSet<>();
+
+ // handles to the sst files in the current snapshot
+ private Map<String, StreamStateHandle> sstFiles = new
HashMap<>();
+
+ // handles to the misc files in the current snapshot
+ private Map<String, StreamStateHandle> miscFiles = new
HashMap<>();
+
+ private StreamStateHandle metaStateHandle = null;
+
+ private RocksDBIncrementalSnapshotOperation(
+ RocksDBKeyedStateBackend<?> stateBackend,
+ CheckpointStreamFactory checkpointStreamFactory,
+ long checkpointId,
+ long checkpointTimestamp) {
+
+ this.stateBackend = stateBackend;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+ }
+
+ private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
+ try {
+ final byte[] buffer = new byte[1024];
+
+ FileSystem backupFileSystem =
backupPath.getFileSystem();
+ inputStream = backupFileSystem.open(filePath);
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ private StreamStateHandle materializeMetaData() throws
Exception {
+ try {
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+ DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+
+ serializationProxy.write(out);
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ void takeSnapshot() throws Exception {
+ // use the last completed checkpoint as the comparison
base.
+ baseSstFiles =
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+
+ // save meta data
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry :
stateBackend.kvStateInformation.entrySet()) {
+
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo =
stateMetaInfoEntry.getValue().f1;
+
+ KeyedBackendSerializationProxy.StateMetaInfo<?,
?> metaInfoProxy =
+ new
KeyedBackendSerializationProxy.StateMetaInfo<>(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+
metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+
+ stateMetaInfos.add(metaInfoProxy);
+ }
+
+ // save state data
+ backupPath = new
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ backupFileSystem = backupPath.getFileSystem();
+ if (backupFileSystem.exists(backupPath)) {
+ LOG.warn("Deleting an existing local checkpoint
directory " +
+ backupPath + ".");
+
+ backupFileSystem.delete(backupPath, true);
+ }
+
+ // create hard links of living files in the checkpoint
path
+ Checkpoint checkpoint =
Checkpoint.create(stateBackend.db);
+ checkpoint.createCheckpoint(backupPath.getPath());
+ }
+
+ KeyedStateHandle materializeSnapshot() throws Exception {
+ // write meta data
+ metaStateHandle = materializeMetaData();
+
+ // write state data
+
Preconditions.checkState(backupFileSystem.exists(backupPath));
+
+ FileStatus[] fileStatuses =
backupFileSystem.listStatus(backupPath);
+ if (fileStatuses != null ) {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path filePath = fileStatus.getPath();
+ String fileName = filePath.getName();
+
+ if (fileName.endsWith(SST_FILE_SUFFIX))
{
+ StreamStateHandle fileHandle =
+ baseSstFiles == null ?
null : baseSstFiles.get(fileName);
+
+ if (fileHandle == null) {
+
newSstFileNames.add(fileName);
+ fileHandle =
materializeStateData(filePath);
+ }
+
+ sstFiles.put(fileName,
fileHandle);
+ } else {
+ StreamStateHandle fileHandle =
materializeStateData(filePath);
+ miscFiles.put(fileName,
fileHandle);
+ }
+ }
+ }
+
+ stateBackend.materializedSstFiles.put(checkpointId,
sstFiles);
+
+ return new RocksDBKeyedStateHandle(stateBackend.jobId,
+ stateBackend.operatorIdentifier,
stateBackend.keyGroupRange,
+ newSstFileNames, sstFiles, miscFiles,
metaStateHandle);
+ }
+
+ void releaseResources(boolean canceled) {
+
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ try {
+ inputStream.close();
+ } catch (Exception e) {
+ LOG.warn("Could not properly close the
input stream.", e);
+ }
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ try {
+ outputStream.close();
+ } catch (Exception e) {
+ LOG.warn("Could not properly close the
output stream.", e);
+ }
+ outputStream = null;
+ }
+
+ if (backupPath != null) {
+ try {
+ if
(backupFileSystem.exists(backupPath)) {
+
backupFileSystem.delete(backupPath, true);
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly delete the
checkpoint directory.", e);
+ }
+ }
+
+ if (canceled) {
+ List<StateObject> statesToDiscard = new
ArrayList<>();
+
+ if (metaStateHandle != null) {
+ statesToDiscard.add(metaStateHandle);
+ }
+
+ statesToDiscard.addAll(miscFiles.values());
+
+ for (String newSstFileName : newSstFileNames) {
+ StreamStateHandle fileHandle =
sstFiles.get(newSstFileName);
+ if (fileHandle != null) {
+ statesToDiscard.add(fileHandle);
+ }
+ }
+
+ try {
+
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
--- End diff --
The same comment about cleanup also kind of applies to the local
checkpointing dirs.
> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because
> RocksDB is base on LSM trees, which record updates in new sst files and all
> sst files are immutable. By only materializing those new sst files, we can
> significantly improve the performance of checkpointing.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)