[
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993384#comment-15993384
]
ASF GitHub Bot commented on FLINK-6364:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114380645
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws
IOException, RocksDBException {
}
}
+ private static class RocksDBIncrementalRestoreOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+ this.stateBackend = stateBackend;
+ }
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> readMetaData(
+ StreamStateHandle metaStateHandle) throws
Exception {
+
+ FSDataInputStream inputStream = null;
+
+ try {
+ inputStream = metaStateHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+ DataInputView in = new
DataInputViewStreamWrapper(inputStream);
+ serializationProxy.read(in);
+
+ return
serializationProxy.getNamedStateSerializationProxies();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+ }
+ }
+
+ private void readStateData(
+ Path restoreFilePath,
+ StreamStateHandle remoteFileHandle) throws
IOException {
+
+ FileSystem restoreFileSystem =
restoreFilePath.getFileSystem();
+
+ FSDataInputStream inputStream = null;
+ FSDataOutputStream outputStream = null;
+
+ try {
+ inputStream =
remoteFileHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream =
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ byte[] buffer = new byte[1024];
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ }
+ }
+ }
+
+ private void restoreInstance(
+ RocksDBKeyedStateHandle restoreStateHandle,
+ boolean hasExtraKeys) throws Exception {
+
+ // read state data
+ Path restoreInstancePath = new Path(
+ stateBackend.instanceBasePath.getAbsolutePath(),
+ UUID.randomUUID().toString());
+
+ try {
+ Map<String, StreamStateHandle> sstFiles =
restoreStateHandle.getSstFiles();
+ for (Map.Entry<String, StreamStateHandle>
sstFileEntry : sstFiles.entrySet()) {
+ String fileName = sstFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
sstFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ Map<String, StreamStateHandle> miscFiles =
restoreStateHandle.getMiscFiles();
+ for (Map.Entry<String, StreamStateHandle>
miscFileEntry : miscFiles.entrySet()) {
+ String fileName =
miscFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
miscFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ // read meta data
+
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
+
readMetaData(restoreStateHandle.getMetaStateHandle());
+
+ List<ColumnFamilyDescriptor>
columnFamilyDescriptors = new ArrayList<>();
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
+
+ for
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy :
stateMetaInfoProxies) {
+
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ stateBackend.columnOptions);
+
+
columnFamilyDescriptors.add(columnFamilyDescriptor);
+ }
+
+ if (hasExtraKeys) {
--- End diff --
I assume you are not using the delete-range so that we can support
rescaling?
> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because
> RocksDB is base on LSM trees, which record updates in new sst files and all
> sst files are immutable. By only materializing those new sst files, we can
> significantly improve the performance of checkpointing.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)