Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114363332
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws
IOException, RocksDBException {
}
}
+ private static class RocksDBIncrementalRestoreOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+ this.stateBackend = stateBackend;
+ }
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> readMetaData(
+ StreamStateHandle metaStateHandle) throws
Exception {
+
+ FSDataInputStream inputStream = null;
+
+ try {
+ inputStream = metaStateHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+ DataInputView in = new
DataInputViewStreamWrapper(inputStream);
+ serializationProxy.read(in);
+
+ return
serializationProxy.getNamedStateSerializationProxies();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+ }
+ }
+
+ private void readStateData(
+ Path restoreFilePath,
+ StreamStateHandle remoteFileHandle) throws
IOException {
+
+ FileSystem restoreFileSystem =
restoreFilePath.getFileSystem();
+
+ FSDataInputStream inputStream = null;
+ FSDataOutputStream outputStream = null;
+
+ try {
+ inputStream =
remoteFileHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream =
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ byte[] buffer = new byte[1024];
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ }
+ }
+ }
+
+ private void restoreInstance(
+ RocksDBKeyedStateHandle restoreStateHandle,
+ boolean hasExtraKeys) throws Exception {
+
+ // read state data
+ Path restoreInstancePath = new Path(
+ stateBackend.instanceBasePath.getAbsolutePath(),
+ UUID.randomUUID().toString());
+
+ try {
+ Map<String, StreamStateHandle> sstFiles =
restoreStateHandle.getSstFiles();
+ for (Map.Entry<String, StreamStateHandle>
sstFileEntry : sstFiles.entrySet()) {
+ String fileName = sstFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
sstFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ Map<String, StreamStateHandle> miscFiles =
restoreStateHandle.getMiscFiles();
+ for (Map.Entry<String, StreamStateHandle>
miscFileEntry : miscFiles.entrySet()) {
+ String fileName =
miscFileEntry.getKey();
+ StreamStateHandle remoteFileHandle =
miscFileEntry.getValue();
+
+ readStateData(new
Path(restoreInstancePath, fileName), remoteFileHandle);
+ }
+
+ // read meta data
+
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
+
readMetaData(restoreStateHandle.getMetaStateHandle());
+
+ List<ColumnFamilyDescriptor>
columnFamilyDescriptors = new ArrayList<>();
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
+
+ for
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy :
stateMetaInfoProxies) {
+
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ stateBackend.columnOptions);
+
+
columnFamilyDescriptors.add(columnFamilyDescriptor);
+ }
+
+ if (hasExtraKeys) {
+
+ List<ColumnFamilyHandle>
columnFamilyHandles = new ArrayList<>();
+
+ RocksDB restoreDb = RocksDB.open(
--- End diff --
I suggest to use try-with-resources on the `restoreDb`. The instance is
never closed and i wonder if also the restore instance directory should be
deleted as part of some safety net cleanup hook (in the future?).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---