dawidwys commented on a change in pull request #14648:
URL: https://github.com/apache/flink/pull/14648#discussion_r567917044
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -124,129 +96,63 @@ public RocksDBFullRestoreOperation(
writeBufferManagerCapacity);
checkArgument(writeBatchSize >= 0, "Write batch size have to be no
negative.");
this.writeBatchSize = writeBatchSize;
+ this.savepointRestoreOperation =
+ new SavepointRestoreOperation<>(
+ keyGroupRange,
+ cancelStreamRegistry,
+ userCodeClassLoader,
+ restoreStateHandles,
+ keySerializerProvider);
}
/** Restores all key-groups data that is referenced by the passed state
handles. */
@Override
public RocksDBRestoreResult restore()
throws IOException, StateMigrationException, RocksDBException {
openDB();
- for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
- if (keyedStateHandle != null) {
-
- if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
- throw unexpectedStateHandleException(
- KeyGroupsStateHandle.class,
keyedStateHandle.getClass());
- }
- this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle)
keyedStateHandle;
- restoreKeyGroupsInStateHandle();
- }
+ try (ThrowingIterator<SavepointRestoreResult> restore =
+ savepointRestoreOperation.restore()) {
+ applyRestoreResult(restore.next());
}
return new RocksDBRestoreResult(
this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1,
null, null);
}
- /** Restore one key groups state handle. */
- private void restoreKeyGroupsInStateHandle()
- throws IOException, StateMigrationException, RocksDBException {
- try {
- logger.info("Starting to restore from state handle: {}.",
currentKeyGroupsStateHandle);
- currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
- cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
- currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
- restoreKVStateMetaData();
- restoreKVStateData();
- logger.info("Finished restoring from state handle: {}.",
currentKeyGroupsStateHandle);
- } finally {
- if
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
- IOUtils.closeQuietly(currentStateHandleInStream);
- }
- }
- }
-
- /**
- * Restore the KV-state / ColumnFamily meta data for all key-groups
referenced by the current
- * state handle.
- */
- private void restoreKVStateMetaData() throws IOException,
StateMigrationException {
- KeyedBackendSerializationProxy<K> serializationProxy =
- readMetaData(currentStateHandleInView);
-
- this.keygroupStreamCompressionDecorator =
- serializationProxy.isUsingKeyGroupCompression()
- ? SnappyStreamCompressionDecorator.INSTANCE
- : UncompressedStreamCompressionDecorator.INSTANCE;
-
+ private void applyRestoreResult(SavepointRestoreResult
savepointRestoreResult)
+ throws IOException, RocksDBException, StateMigrationException {
List<StateMetaInfoSnapshot> restoredMetaInfos =
- serializationProxy.getStateMetaInfoSnapshots();
- currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
-
- for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
- RocksDbKvStateInfo registeredStateCFHandle =
- getOrRegisterStateColumnFamilyHandle(null,
restoredMetaInfo);
-
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
- }
+ savepointRestoreResult.getStateMetaInfoSnapshots();
+ List<ColumnFamilyHandle> columnFamilyHandles =
+ restoredMetaInfos.stream()
+ .map(
+ stateMetaInfoSnapshot -> {
+ RocksDbKvStateInfo registeredStateCFHandle
=
+
getOrRegisterStateColumnFamilyHandle(
+ null,
stateMetaInfoSnapshot);
+ return
registeredStateCFHandle.columnFamilyHandle;
+ })
+ .collect(Collectors.toList());
+ restoreKVStateData(savepointRestoreResult.getRestoredKeyGroups(),
columnFamilyHandles);
}
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced
by the current state
* handle.
*/
- private void restoreKVStateData() throws IOException, RocksDBException {
+ private void restoreKVStateData(
+ ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle>
columnFamilies)
+ throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
- for (Tuple2<Integer, Long> keyGroupOffset :
- currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
- int keyGroup = keyGroupOffset.f0;
-
- // Check that restored key groups all belong to the backend
- Preconditions.checkState(
- keyGroupRange.contains(keyGroup),
- "The key group must belong to the backend");
-
- long offset = keyGroupOffset.f1;
- // not empty key-group?
- if (0L != offset) {
- currentStateHandleInStream.seek(offset);
- try (InputStream compressedKgIn =
-
keygroupStreamCompressionDecorator.decorateWithCompression(
- currentStateHandleInStream)) {
- DataInputViewStreamWrapper compressedKgInputView =
- new DataInputViewStreamWrapper(compressedKgIn);
- // TODO this could be aware of keyGroupPrefixBytes and
write only one byte
- // if possible
- int kvStateId = compressedKgInputView.readShort();
- ColumnFamilyHandle handle =
-
currentStateHandleKVStateColumnFamilies.get(kvStateId);
- // insert all k/v pairs into DB
- boolean keyGroupHasMoreKeys = true;
- while (keyGroupHasMoreKeys) {
- byte[] key =
-
BytePrimitiveArraySerializer.INSTANCE.deserialize(
- compressedKgInputView);
- byte[] value =
-
BytePrimitiveArraySerializer.INSTANCE.deserialize(
- compressedKgInputView);
- if (hasMetaDataFollowsFlag(key)) {
- // clear the signal bit in the key to make it
ready for insertion
- // again
- clearMetaDataFollowsFlag(key);
- writeBatchWrapper.put(handle, key, value);
- // TODO this could be aware of
keyGroupPrefixBytes and write only
- // one byte if possible
- kvStateId =
- END_OF_KEY_GROUP_MARK &
compressedKgInputView.readShort();
- if (END_OF_KEY_GROUP_MARK == kvStateId) {
- keyGroupHasMoreKeys = false;
- } else {
- handle =
currentStateHandleKVStateColumnFamilies.get(kvStateId);
- }
- } else {
- writeBatchWrapper.put(handle, key, value);
- }
- }
- }
+ while (keyGroups.hasNext()) {
+ KeyGroup keyGroup = keyGroups.next();
+ ThrowingIterator<KeyGroupEntry> groupEntries =
keyGroup.getKeyGroupEntries();
+ while (groupEntries.hasNext()) {
+ // TODO we call it more times than before
Review comment:
In the previous version we were accessing the `columnFamilies` map only
if the kvStateId changed. In the current version we are accessing the map for
each entry. I was torn if we should optimize it here already or is it fine for
the time being to leave it as it is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]