[
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497725#comment-16497725
]
ASF GitHub Bot commented on FLINK-8790:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5582#discussion_r192330418
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -795,20 +806,241 @@ private void restoreInstance(
}
/**
- * Recovery from local incremental state.
+ * Recovery from multi incremental states.
+ * In case of rescaling, this method creates a temporary
RocksDB instance for a key-groups shard. All contents
+ * from the temporary instance are copied into the real restore
instance and then the temporary instance is
+ * discarded.
*/
- private void restoreInstance(IncrementalLocalKeyedStateHandle
localKeyedStateHandle) throws Exception {
+ void restoreFromMultiHandles(Collection<KeyedStateHandle>
restoreStateHandles) throws Exception {
+
+ KeyGroupRange targetKeyGroupRange =
stateBackend.keyGroupRange;
+
+ chooseTheBestStateHandleToInit(restoreStateHandles,
targetKeyGroupRange);
+
+ int targetStartKeyGroup =
stateBackend.getKeyGroupRange().getStartKeyGroup();
+ byte[] targetStartKeyGroupPrefixBytes = new
byte[stateBackend.keyGroupPrefixBytes];
+ for (int j = 0; j < stateBackend.keyGroupPrefixBytes;
++j) {
+ targetStartKeyGroupPrefixBytes[j] = (byte)
(targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) *
Byte.SIZE));
+ }
+
+ for (KeyedStateHandle rawStateHandle :
restoreStateHandles) {
+
+ if (!(rawStateHandle instanceof
IncrementalKeyedStateHandle)) {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected " +
IncrementalKeyedStateHandle.class +
+ ", but found " +
rawStateHandle.getClass());
+ }
+
+ Path temporaryRestoreInstancePath = new
Path(stateBackend.instanceBasePath.getAbsolutePath() +
UUID.randomUUID().toString());
+ try (RestoredDBInstance tmpRestoreDBInfo =
restoreDBFromStateHandle(
+ (IncrementalKeyedStateHandle)
rawStateHandle,
+ temporaryRestoreInstancePath,
+ targetKeyGroupRange,
+
stateBackend.keyGroupPrefixBytes,
+ false);
+ RocksDBWriteBatchWrapper
writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
+
+ List<ColumnFamilyDescriptor>
tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
+ List<ColumnFamilyHandle>
tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
+
+ int startKeyGroup =
stateBackend.getKeyGroupRange().getStartKeyGroup();
+ byte[] startKeyGroupPrefixBytes = new
byte[stateBackend.keyGroupPrefixBytes];
+ for (int j = 0; j <
stateBackend.keyGroupPrefixBytes; ++j) {
+ startKeyGroupPrefixBytes[j] =
(byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) *
Byte.SIZE));
+ }
+
+ // iterating only the requested
descriptors automatically skips the default column family handle
+ for (int i = 0; i <
tmpColumnFamilyDescriptors.size(); ++i) {
+ ColumnFamilyHandle
tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
+ ColumnFamilyDescriptor
tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i);
+
+ ColumnFamilyHandle
targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle(
+
tmpColumnFamilyDescriptor, null,
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
+
+ try (RocksIteratorWrapper
iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
+
+
iterator.seek(startKeyGroupPrefixBytes);
+
+ while
(iterator.isValid()) {
+
+ int keyGroup =
0;
+ for (int j = 0;
j < stateBackend.keyGroupPrefixBytes; ++j) {
+
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+ }
+
+ if
(stateBackend.keyGroupRange.contains(keyGroup)) {
+
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(),
iterator.value());
+ } else {
+ //
Since the iterator will visit the record according to the sorted order,
+ // we
can just break here.
+ break;
+ }
+
+ iterator.next();
+ }
+ } // releases native iterator
resources
+ }
+ } finally {
+ FileSystem restoreFileSystem =
temporaryRestoreInstancePath.getFileSystem();
+ if
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
+
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
+ }
+ }
+ }
+ }
+
+ private class RestoredDBInstance implements AutoCloseable {
+
+ @Nonnull
+ private final RocksDB db;
+
+ @Nonnull
+ private final ColumnFamilyHandle
defaultColumnFamilyHandle;
+
+ @Nonnull
+ private final List<ColumnFamilyHandle>
columnFamilyHandles;
+
+ @Nonnull
+ private final List<ColumnFamilyDescriptor>
columnFamilyDescriptors;
+
+ @Nonnull
+ private final
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+ public RestoredDBInstance(@Nonnull RocksDB db,
+ @Nonnull
List<ColumnFamilyHandle> columnFamilyHandles,
+ @Nonnull
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+ @Nonnull
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>>
stateMetaInfoSnapshots) {
+ this.db = db;
+ this.columnFamilyHandles = columnFamilyHandles;
+ this.defaultColumnFamilyHandle =
this.columnFamilyHandles.remove(0);
+ this.columnFamilyDescriptors =
columnFamilyDescriptors;
+ this.stateMetaInfoSnapshots =
stateMetaInfoSnapshots;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ IOUtils.closeQuietly(defaultColumnFamilyHandle);
+
+ for (ColumnFamilyHandle columnFamilyHandle :
columnFamilyHandles) {
+
IOUtils.closeQuietly(columnFamilyHandle);
+ }
+
+ IOUtils.closeQuietly(db);
+ }
+ }
+
+ private RestoredDBInstance restoreDBFromStateHandle(
+ IncrementalKeyedStateHandle restoreStateHandle,
+ Path temporaryRestoreInstancePath,
+ KeyGroupRange targetKeyGroupRange,
+ int keyGroupPrefixBytes,
+ boolean needClip) throws Exception {
+
+ transferAllStateDataToDirectory(restoreStateHandle,
temporaryRestoreInstancePath);
+
// read meta data
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> stateMetaInfoSnapshots =
-
readMetaData(localKeyedStateHandle.getMetaDataState());
+
readMetaData(restoreStateHandle.getMetaStateHandle());
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
- restoreLocalStateIntoFullInstance(
- localKeyedStateHandle,
+ List<ColumnFamilyHandle> columnFamilyHandles =
+ new ArrayList<>(stateMetaInfoSnapshots.size() +
1);
+
+ RocksDB restoreDb = stateBackend.openDB(
+ temporaryRestoreInstancePath.getPath(),
columnFamilyDescriptors,
- stateMetaInfoSnapshots);
+ columnFamilyHandles);
+
+ if (needClip) {
+
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
+ restoreDb,
+ columnFamilyHandles,
+ targetKeyGroupRange,
+ restoreStateHandle.getKeyGroupRange(),
+ keyGroupPrefixBytes);
+ }
+
+ return new RestoredDBInstance(restoreDb,
columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
+ }
+
+ private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
+ ColumnFamilyDescriptor columnFamilyDescriptor,
+ ColumnFamilyHandle columnFamilyHandle,
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
stateMetaInfoSnapshot) throws RocksDBException {
+
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
+
+ if (null == registeredStateMetaInfoEntry) {
+ RegisteredKeyedBackendStateMetaInfo<?, ?>
stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
stateMetaInfoSnapshot.getStateType(),
+ stateMetaInfoSnapshot.getName(),
+
stateMetaInfoSnapshot.getNamespaceSerializer(),
+
stateMetaInfoSnapshot.getStateSerializer());
+
+ registeredStateMetaInfoEntry =
+ new Tuple2<>(
+ columnFamilyHandle != null ?
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+ stateMetaInfo);
+
+ stateBackend.kvStateInformation.put(
+ stateMetaInfoSnapshot.getName(),
+ registeredStateMetaInfoEntry);
+ }
+
+ return registeredStateMetaInfoEntry.f0;
+ }
+
+ private void chooseTheBestStateHandleToInit(
--- End diff --
I think the name of this method is no longer accurate: it does not only
chose the best handle, it already restores as db instance. Maybe we can we
still break this up into two methods, so that each method only does one thing.
I think it is not so nice if that creating the db is a side effect of a method
that claims to only find something.
> Improve performance for recovery from incremental checkpoint
> ------------------------------------------------------------
>
> Key: FLINK-8790
> URL: https://issues.apache.org/jira/browse/FLINK-8790
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Major
> Fix For: 1.6.0
>
>
> When there are multi state handle to be restored, we can improve the
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according
> to the target key group range (via rocksdb.deleteRange()), this can help use
> get rid of the `key group check` in
> `data insertion loop` and also help us get rid of traversing the useless
> record.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)