rkhachatryan commented on a change in pull request #17179:
URL: https://github.com/apache/flink/pull/17179#discussion_r750618981



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
##########
@@ -38,7 +41,7 @@
     public static <T extends CompletedCheckpointStore>
             CheckpointRecoveryFactory 
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
         return new PerJobCheckpointRecoveryFactory<>(
-                (maxCheckpoints, previous) -> {
+                (maxCheckpoints, previous, sharedStateRegistry) -> {

Review comment:
       I guess `previous` is optional during the 1st invokation?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2974,6 +2971,137 @@ public void testSharedStateRegistrationOnRestore() 
throws Exception {
         }
     }
 
+    @Test
+    public void testSharedStateRegistrationWithoutRebuildSharedStateRegistry() 
throws Exception {

Review comment:
       I agree with Dawid, it tests the existing functionality but the one that 
was changed.
   
   I think `CheckpointRecoveryFactory` implementations should be unit-tested. 
And ideally, their use by schedulers too (maybe using 
`TestingCheckpointRecoveryFactory`).
   WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
##########
@@ -31,10 +35,17 @@
      * @param jobId Job ID to recover checkpoints for
      * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to 
retain
      * @param userClassLoader User code class loader of the job
+     * @param sharedStateRegistryFactory Simple factory to produce {@link 
SharedStateRegistry}
+     *     objects.
+     * @param ioExecutor Executor used to run (async) deletes.
      * @return {@link CompletedCheckpointStore} instance for the job
      */
     CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
-            JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader)
+            JobID jobId,
+            int maxNumberOfCheckpointsToRetain,
+            ClassLoader userClassLoader,
+            SharedStateRegistryFactory sharedStateRegistryFactory,

Review comment:
       I think it could also be passed as a constructor parameter to 
`CheckpointRecoveryFactory` implementations, but such a factory reduces 
coupling (as opposed to directly constructing the registry inside 
`CheckpointRecoveryFactory`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1520,24 +1509,6 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
                 throw new IllegalStateException("CheckpointCoordinator is shut 
down");
             }
 
-            // We create a new shared state registry object, so that all 
pending async disposal
-            // requests from previous runs will go against the old object 
(were they can do no
-            // harm). This must happen under the checkpoint lock.
-            sharedStateRegistry.close();

Review comment:
       Is this call now missing and the old registry isn't closed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to