[ https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497733#comment-16497733 ]
ASF GitHub Bot commented on FLINK-8790: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r192331370 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -795,20 +806,241 @@ private void restoreInstance( } /** - * Recovery from local incremental state. + * Recovery from multi incremental states. + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. */ - private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + void restoreFromMultiHandles(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + + KeyGroupRange targetKeyGroupRange = stateBackend.keyGroupRange; + + chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange); + + int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + targetStartKeyGroupPrefixBytes[j] = (byte) (targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); + try (RestoredDBInstance tmpRestoreDBInfo = restoreDBFromStateHandle( + (IncrementalKeyedStateHandle) rawStateHandle, + temporaryRestoreInstancePath, + targetKeyGroupRange, + stateBackend.keyGroupPrefixBytes, + false); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) { + + List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; + List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); + ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); + + ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( + tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); + + try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } + + if (stateBackend.keyGroupRange.contains(keyGroup)) { + writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the sorted order, + // we can just break here. + break; + } + + iterator.next(); + } + } // releases native iterator resources + } + } finally { + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } + } + } + } + + private class RestoredDBInstance implements AutoCloseable { + + @Nonnull + private final RocksDB db; + + @Nonnull + private final ColumnFamilyHandle defaultColumnFamilyHandle; + + @Nonnull + private final List<ColumnFamilyHandle> columnFamilyHandles; + + @Nonnull + private final List<ColumnFamilyDescriptor> columnFamilyDescriptors; + + @Nonnull + private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots; + + public RestoredDBInstance(@Nonnull RocksDB db, + @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, + @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, + @Nonnull List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { + this.db = db; + this.columnFamilyHandles = columnFamilyHandles; + this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0); + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + } + + @Override + public void close() throws Exception { + + IOUtils.closeQuietly(defaultColumnFamilyHandle); + + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + + IOUtils.closeQuietly(db); + } + } + + private RestoredDBInstance restoreDBFromStateHandle( + IncrementalKeyedStateHandle restoreStateHandle, + Path temporaryRestoreInstancePath, + KeyGroupRange targetKeyGroupRange, + int keyGroupPrefixBytes, + boolean needClip) throws Exception { + + transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); + // read meta data List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = - readMetaData(localKeyedStateHandle.getMetaDataState()); + readMetaData(restoreStateHandle.getMetaStateHandle()); List<ColumnFamilyDescriptor> columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - restoreLocalStateIntoFullInstance( - localKeyedStateHandle, + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(stateMetaInfoSnapshots.size() + 1); + + RocksDB restoreDb = stateBackend.openDB( + temporaryRestoreInstancePath.getPath(), columnFamilyDescriptors, - stateMetaInfoSnapshots); + columnFamilyHandles); + + if (needClip) { --- End diff -- Instead of using this clipping as a flag, would it not be better to just have a method that clips a `RestoredDBInstance` which is simply called after `restoreDBFromStateHandle` in the on case that needs it? This would allow to not mix up those two tings in one method. > Improve performance for recovery from incremental checkpoint > ------------------------------------------------------------ > > Key: FLINK-8790 > URL: https://issues.apache.org/jira/browse/FLINK-8790 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > Fix For: 1.6.0 > > > When there are multi state handle to be restored, we can improve the > performance as follow: > 1. Choose the best state handle to init the target db > 2. Use the other state handles to create temp db, and clip the db according > to the target key group range (via rocksdb.deleteRange()), this can help use > get rid of the `key group check` in > `data insertion loop` and also help us get rid of traversing the useless > record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)