rkhachatryan commented on code in PR #24168:
URL: https://github.com/apache/flink/pull/24168#discussion_r1462146343


##########
docs/content/docs/ops/traces.md:
##########
@@ -159,6 +159,10 @@ Flink reports a single span trace for the whole checkpoint 
and job initializatio
       <td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only 
supported by RocksDB Incremental)</td>
       <td>The aggregated (max and sum) across all subtasks duration of 
downloading state files from the DFS.</td>
     </tr>
+    <tr>
+      <td>(Max/Sum)LocalRestoreStateDuration<br><br>(optional - currently only 
supported by RocksDB Incremental)</td>
+      <td>The aggregated (max and sum) across all subtasks duration of 
rescaling and restoring from local state, i.e. the part of restore after all 
remote state was downloaded.</td>

Review Comment:
   > duration of rescaling and restoring from local state
   
   Doesn't it include restoring DB from both, local and remote state handles?
   Since remote handles are first "converted" to local.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -302,114 +334,204 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
                 localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) 
stateHandle);
             } else {
                 throw unexpectedStateHandleException(
-                        IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
+                        EXPECTED_STATE_HANDLE_CLASSES, stateHandle.getClass());
             }
         }
 
         allDownloadSpecs.stream()
                 
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
                 .forEach(localKeyedStateHandles::add);
 
+        try {
+            // Process all state downloads
+            transferRemoteStateToLocalDirectory(allDownloadSpecs);
+            rescaleAndRestoreFromLocalState(localKeyedStateHandles);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleAndRestoreFromLocalState(
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) 
throws Exception {
+
+        // Transfer remaining key-groups from temporary instance into base DB
+        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+
+        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
+
+        runAndReportDuration(
+                () -> {
+                    if (useIngestDbRestoreMode) {
+                        // Optimized path with Ingest/Clip
+                        rescaleClipIngestDB(

Review Comment:
   nit: How about moving `runAndReportDuration` to the top of this method, so 
that serialization is also included?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java:
##########
@@ -69,6 +69,7 @@ private MetricNames() {}
     public static final String INITIALIZE_STATE_DURATION = 
"InitializeStateDurationMs";
     public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
     public static final String DOWNLOAD_STATE_DURATION = 
"DownloadStateDurationMs";
+    public static final String LOCAL_RESTORE_STATE_DURATION = 
"LocalRestoreStateDuration";

Review Comment:
   1. Add `Ms` suffix, analogous to `DownloadStateDurationMs` above? 
   2. Although "Local" is correct, it might be misleading: users may imply that 
it's about locally recovered state. How about dropping it from the constant and 
from name?
   
   (Would need to update docs and tests I guess)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to