[
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993346#comment-15993346
]
ASF GitHub Bot commented on FLINK-6364:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114359825
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws
InterruptedException {
}
}
+ private static class RocksDBIncrementalSnapshotOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ private final long checkpointId;
+
+ private final long checkpointTimestamp;
+
+ private Map<String, StreamStateHandle> baseSstFiles;
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> stateMetaInfos = new ArrayList<>();
+
+ private FileSystem backupFileSystem;
+ private Path backupPath;
+
+ private FSDataInputStream inputStream = null;
+ private CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
+ // new sst files since the last completed checkpoint
+ private Set<String> newSstFileNames = new HashSet<>();
+
+ // handles to the sst files in the current snapshot
+ private Map<String, StreamStateHandle> sstFiles = new
HashMap<>();
+
+ // handles to the misc files in the current snapshot
+ private Map<String, StreamStateHandle> miscFiles = new
HashMap<>();
+
+ private StreamStateHandle metaStateHandle = null;
+
+ private RocksDBIncrementalSnapshotOperation(
+ RocksDBKeyedStateBackend<?> stateBackend,
+ CheckpointStreamFactory checkpointStreamFactory,
+ long checkpointId,
+ long checkpointTimestamp) {
+
+ this.stateBackend = stateBackend;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+ }
+
+ private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
+ try {
+ final byte[] buffer = new byte[1024];
+
+ FileSystem backupFileSystem =
backupPath.getFileSystem();
+ inputStream = backupFileSystem.open(filePath);
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ private StreamStateHandle materializeMetaData() throws
Exception {
+ try {
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+ DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+
+ serializationProxy.write(out);
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ void takeSnapshot() throws Exception {
+ // use the last completed checkpoint as the comparison
base.
+ baseSstFiles =
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+
+ // save meta data
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry :
stateBackend.kvStateInformation.entrySet()) {
+
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo =
stateMetaInfoEntry.getValue().f1;
+
+ KeyedBackendSerializationProxy.StateMetaInfo<?,
?> metaInfoProxy =
+ new
KeyedBackendSerializationProxy.StateMetaInfo<>(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+
metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+
+ stateMetaInfos.add(metaInfoProxy);
+ }
+
+ // save state data
+ backupPath = new
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ backupFileSystem = backupPath.getFileSystem();
+ if (backupFileSystem.exists(backupPath)) {
+ LOG.warn("Deleting an existing local checkpoint
directory " +
+ backupPath + ".");
+
+ backupFileSystem.delete(backupPath, true);
+ }
+
+ // create hard links of living files in the checkpoint
path
+ Checkpoint checkpoint =
Checkpoint.create(stateBackend.db);
+ checkpoint.createCheckpoint(backupPath.getPath());
+ }
+
+ KeyedStateHandle materializeSnapshot() throws Exception {
+ // write meta data
+ metaStateHandle = materializeMetaData();
+
+ // write state data
+
Preconditions.checkState(backupFileSystem.exists(backupPath));
+
+ FileStatus[] fileStatuses =
backupFileSystem.listStatus(backupPath);
+ if (fileStatuses != null ) {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path filePath = fileStatus.getPath();
+ String fileName = filePath.getName();
+
+ if (fileName.endsWith(SST_FILE_SUFFIX))
{
+ StreamStateHandle fileHandle =
+ baseSstFiles == null ?
null : baseSstFiles.get(fileName);
+
+ if (fileHandle == null) {
+
newSstFileNames.add(fileName);
+ fileHandle =
materializeStateData(filePath);
+ }
+
+ sstFiles.put(fileName,
fileHandle);
+ } else {
+ StreamStateHandle fileHandle =
materializeStateData(filePath);
+ miscFiles.put(fileName,
fileHandle);
+ }
+ }
+ }
+
+ stateBackend.materializedSstFiles.put(checkpointId,
sstFiles);
+
+ return new RocksDBKeyedStateHandle(stateBackend.jobId,
+ stateBackend.operatorIdentifier,
stateBackend.keyGroupRange,
+ newSstFileNames, sstFiles, miscFiles,
metaStateHandle);
+ }
+
+ void releaseResources(boolean canceled) {
+
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ try {
+ inputStream.close();
+ } catch (Exception e) {
+ LOG.warn("Could not properly close the
input stream.", e);
+ }
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ try {
+ outputStream.close();
+ } catch (Exception e) {
+ LOG.warn("Could not properly close the
output stream.", e);
+ }
+ outputStream = null;
+ }
+
+ if (backupPath != null) {
+ try {
+ if
(backupFileSystem.exists(backupPath)) {
+
backupFileSystem.delete(backupPath, true);
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly delete the
checkpoint directory.", e);
+ }
+ }
+
+ if (canceled) {
+ List<StateObject> statesToDiscard = new
ArrayList<>();
+
+ if (metaStateHandle != null) {
+ statesToDiscard.add(metaStateHandle);
+ }
+
+ statesToDiscard.addAll(miscFiles.values());
+
+ for (String newSstFileName : newSstFileNames) {
+ StreamStateHandle fileHandle =
sstFiles.get(newSstFileName);
+ if (fileHandle != null) {
+ statesToDiscard.add(fileHandle);
+ }
+ }
+
+ try {
+
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
--- End diff --
This looks like we should also have some safety net mechanism or global
cleanup hook in case an exception happens or the node goes down between
cancelation and triggering of the discard. In this case, all knowledge about
the created files is lost. I wonder if the shared state registry should perform
cleanups at startup (maybe later also after incremental recoveries) for
unreferenced files in the shared directory. However, this sounds like it could
be an expensive operation on some file systems.
> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because
> RocksDB is base on LSM trees, which record updates in new sst files and all
> sst files are immutable. By only materializing those new sst files, we can
> significantly improve the performance of checkpointing.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)