[
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993348#comment-15993348
]
ASF GitHub Bot commented on FLINK-6364:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114362074
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws
InterruptedException {
}
}
+ private static class RocksDBIncrementalSnapshotOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ private final long checkpointId;
+
+ private final long checkpointTimestamp;
+
+ private Map<String, StreamStateHandle> baseSstFiles;
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> stateMetaInfos = new ArrayList<>();
+
+ private FileSystem backupFileSystem;
+ private Path backupPath;
+
+ private FSDataInputStream inputStream = null;
+ private CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
+ // new sst files since the last completed checkpoint
+ private Set<String> newSstFileNames = new HashSet<>();
+
+ // handles to the sst files in the current snapshot
+ private Map<String, StreamStateHandle> sstFiles = new
HashMap<>();
+
+ // handles to the misc files in the current snapshot
+ private Map<String, StreamStateHandle> miscFiles = new
HashMap<>();
+
+ private StreamStateHandle metaStateHandle = null;
+
+ private RocksDBIncrementalSnapshotOperation(
+ RocksDBKeyedStateBackend<?> stateBackend,
+ CheckpointStreamFactory checkpointStreamFactory,
+ long checkpointId,
+ long checkpointTimestamp) {
+
+ this.stateBackend = stateBackend;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+ }
+
+ private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
+ try {
+ final byte[] buffer = new byte[1024];
+
+ FileSystem backupFileSystem =
backupPath.getFileSystem();
+ inputStream = backupFileSystem.open(filePath);
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ private StreamStateHandle materializeMetaData() throws
Exception {
+ try {
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+ DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+
+ serializationProxy.write(out);
+
+ return outputStream.closeAndGetHandle();
--- End diff --
Same comment about nulling the `outputStream` applies here.
> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because
> RocksDB is base on LSM trees, which record updates in new sst files and all
> sst files are immutable. By only materializing those new sst files, we can
> significantly improve the performance of checkpointing.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)