[
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996218#comment-15996218
]
ASF GitHub Bot commented on FLINK-6364:
---------------------------------------
Github user shixiaogang commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114703946
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws
IOException, RocksDBException {
}
}
+ private static class RocksDBIncrementalRestoreOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+ this.stateBackend = stateBackend;
+ }
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> readMetaData(
+ StreamStateHandle metaStateHandle) throws
Exception {
+
+ FSDataInputStream inputStream = null;
+
+ try {
+ inputStream = metaStateHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+ DataInputView in = new
DataInputViewStreamWrapper(inputStream);
+ serializationProxy.read(in);
+
+ return
serializationProxy.getNamedStateSerializationProxies();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+ }
+ }
+
+ private void readStateData(
+ Path restoreFilePath,
+ StreamStateHandle remoteFileHandle) throws
IOException {
+
+ FileSystem restoreFileSystem =
restoreFilePath.getFileSystem();
+
+ FSDataInputStream inputStream = null;
+ FSDataOutputStream outputStream = null;
+
+ try {
+ inputStream =
remoteFileHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream =
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ byte[] buffer = new byte[1024];
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ }
+ }
+ }
+
+ private void restoreInstance(
+ RocksDBKeyedStateHandle restoreStateHandle,
+ boolean hasExtraKeys) throws Exception {
+
+ // read state data
+ Path restoreInstancePath = new Path(
+ stateBackend.instanceBasePath.getAbsolutePath(),
+ UUID.randomUUID().toString());
+
+ try {
+ Map<String, StreamStateHandle> sstFiles =
restoreStateHandle.getSstFiles();
+ for (Map.Entry<String, StreamStateHandle>
sstFileEntry : sstFiles.entrySet()) {
+ String fileName = sstFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
sstFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ Map<String, StreamStateHandle> miscFiles =
restoreStateHandle.getMiscFiles();
+ for (Map.Entry<String, StreamStateHandle>
miscFileEntry : miscFiles.entrySet()) {
+ String fileName =
miscFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
miscFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ // read meta data
+
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
+
readMetaData(restoreStateHandle.getMetaStateHandle());
+
+ List<ColumnFamilyDescriptor>
columnFamilyDescriptors = new ArrayList<>();
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
+
+ for
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy :
stateMetaInfoProxies) {
+
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ stateBackend.columnOptions);
+
+
columnFamilyDescriptors.add(columnFamilyDescriptor);
+ }
+
+ if (hasExtraKeys) {
--- End diff --
The optimization is targeted for the deletion in the same instance. In our
cases, we are moving key-value pairs from one instance to another, so the
optimizations for Delete-A-Range-Of-Keys may not work here. Multi-put may do
help to improve the performance, but little improvement is observed in the
experiments.
Maybe we can leave it as it is and do the optimization in the future. What
do you think?
> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because
> RocksDB is base on LSM trees, which record updates in new sst files and all
> sst files are immutable. By only materializing those new sst files, we can
> significantly improve the performance of checkpointing.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)