dawidwys commented on a change in pull request #14648:
URL: https://github.com/apache/flink/pull/14648#discussion_r567917044



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -124,129 +96,63 @@ public RocksDBFullRestoreOperation(
                 writeBufferManagerCapacity);
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no 
negative.");
         this.writeBatchSize = writeBatchSize;
+        this.savepointRestoreOperation =
+                new SavepointRestoreOperation<>(
+                        keyGroupRange,
+                        cancelStreamRegistry,
+                        userCodeClassLoader,
+                        restoreStateHandles,
+                        keySerializerProvider);
     }
 
     /** Restores all key-groups data that is referenced by the passed state 
handles. */
     @Override
     public RocksDBRestoreResult restore()
             throws IOException, StateMigrationException, RocksDBException {
         openDB();
-        for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
-            if (keyedStateHandle != null) {
-
-                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
-                    throw unexpectedStateHandleException(
-                            KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
-                }
-                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) 
keyedStateHandle;
-                restoreKeyGroupsInStateHandle();
-            }
+        try (ThrowingIterator<SavepointRestoreResult> restore =
+                savepointRestoreOperation.restore()) {
+            applyRestoreResult(restore.next());
         }
         return new RocksDBRestoreResult(
                 this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, 
null, null);
     }
 
-    /** Restore one key groups state handle. */
-    private void restoreKeyGroupsInStateHandle()
-            throws IOException, StateMigrationException, RocksDBException {
-        try {
-            logger.info("Starting to restore from state handle: {}.", 
currentKeyGroupsStateHandle);
-            currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-            cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-            currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-            restoreKVStateMetaData();
-            restoreKVStateData();
-            logger.info("Finished restoring from state handle: {}.", 
currentKeyGroupsStateHandle);
-        } finally {
-            if 
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
-                IOUtils.closeQuietly(currentStateHandleInStream);
-            }
-        }
-    }
-
-    /**
-     * Restore the KV-state / ColumnFamily meta data for all key-groups 
referenced by the current
-     * state handle.
-     */
-    private void restoreKVStateMetaData() throws IOException, 
StateMigrationException {
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                readMetaData(currentStateHandleInView);
-
-        this.keygroupStreamCompressionDecorator =
-                serializationProxy.isUsingKeyGroupCompression()
-                        ? SnappyStreamCompressionDecorator.INSTANCE
-                        : UncompressedStreamCompressionDecorator.INSTANCE;
-
+    private void applyRestoreResult(SavepointRestoreResult 
savepointRestoreResult)
+            throws IOException, RocksDBException, StateMigrationException {
         List<StateMetaInfoSnapshot> restoredMetaInfos =
-                serializationProxy.getStateMetaInfoSnapshots();
-        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-
-        for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
-            RocksDbKvStateInfo registeredStateCFHandle =
-                    getOrRegisterStateColumnFamilyHandle(null, 
restoredMetaInfo);
-            
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
-        }
+                savepointRestoreResult.getStateMetaInfoSnapshots();
+        List<ColumnFamilyHandle> columnFamilyHandles =
+                restoredMetaInfos.stream()
+                        .map(
+                                stateMetaInfoSnapshot -> {
+                                    RocksDbKvStateInfo registeredStateCFHandle 
=
+                                            
getOrRegisterStateColumnFamilyHandle(
+                                                    null, 
stateMetaInfoSnapshot);
+                                    return 
registeredStateCFHandle.columnFamilyHandle;
+                                })
+                        .collect(Collectors.toList());
+        restoreKVStateData(savepointRestoreResult.getRestoredKeyGroups(), 
columnFamilyHandles);
     }
 
     /**
      * Restore the KV-state / ColumnFamily data for all key-groups referenced 
by the current state
      * handle.
      */
-    private void restoreKVStateData() throws IOException, RocksDBException {
+    private void restoreKVStateData(
+            ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle> 
columnFamilies)
+            throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
                 new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-            for (Tuple2<Integer, Long> keyGroupOffset :
-                    currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                int keyGroup = keyGroupOffset.f0;
-
-                // Check that restored key groups all belong to the backend
-                Preconditions.checkState(
-                        keyGroupRange.contains(keyGroup),
-                        "The key group must belong to the backend");
-
-                long offset = keyGroupOffset.f1;
-                // not empty key-group?
-                if (0L != offset) {
-                    currentStateHandleInStream.seek(offset);
-                    try (InputStream compressedKgIn =
-                            
keygroupStreamCompressionDecorator.decorateWithCompression(
-                                    currentStateHandleInStream)) {
-                        DataInputViewStreamWrapper compressedKgInputView =
-                                new DataInputViewStreamWrapper(compressedKgIn);
-                        // TODO this could be aware of keyGroupPrefixBytes and 
write only one byte
-                        // if possible
-                        int kvStateId = compressedKgInputView.readShort();
-                        ColumnFamilyHandle handle =
-                                
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                        // insert all k/v pairs into DB
-                        boolean keyGroupHasMoreKeys = true;
-                        while (keyGroupHasMoreKeys) {
-                            byte[] key =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            byte[] value =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            if (hasMetaDataFollowsFlag(key)) {
-                                // clear the signal bit in the key to make it 
ready for insertion
-                                // again
-                                clearMetaDataFollowsFlag(key);
-                                writeBatchWrapper.put(handle, key, value);
-                                // TODO this could be aware of 
keyGroupPrefixBytes and write only
-                                // one byte if possible
-                                kvStateId =
-                                        END_OF_KEY_GROUP_MARK & 
compressedKgInputView.readShort();
-                                if (END_OF_KEY_GROUP_MARK == kvStateId) {
-                                    keyGroupHasMoreKeys = false;
-                                } else {
-                                    handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                }
-                            } else {
-                                writeBatchWrapper.put(handle, key, value);
-                            }
-                        }
-                    }
+            while (keyGroups.hasNext()) {
+                KeyGroup keyGroup = keyGroups.next();
+                ThrowingIterator<KeyGroupEntry> groupEntries = 
keyGroup.getKeyGroupEntries();
+                while (groupEntries.hasNext()) {
+                    // TODO we call it more times than before

Review comment:
       In the previous version we were accessing the `columnFamilies` map only 
if the kvStateId changed. In the current version we are accessing the map for 
each entry. I was torn if we should optimize it here already or is it fine for 
the time being to leave it as it is.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to