rkhachatryan commented on a change in pull request #17179:
URL: https://github.com/apache/flink/pull/17179#discussion_r750618981
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
##########
@@ -38,7 +41,7 @@
public static <T extends CompletedCheckpointStore>
CheckpointRecoveryFactory
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
return new PerJobCheckpointRecoveryFactory<>(
- (maxCheckpoints, previous) -> {
+ (maxCheckpoints, previous, sharedStateRegistry) -> {
Review comment:
I guess `previous` is optional during the 1st invokation?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2974,6 +2971,137 @@ public void testSharedStateRegistrationOnRestore()
throws Exception {
}
}
+ @Test
+ public void testSharedStateRegistrationWithoutRebuildSharedStateRegistry()
throws Exception {
Review comment:
I agree with Dawid, it tests the existing functionality but the one that
was changed.
I think `CheckpointRecoveryFactory` implementations should be unit-tested.
And ideally, their use by schedulers too (maybe using
`TestingCheckpointRecoveryFactory`).
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
##########
@@ -31,10 +35,17 @@
* @param jobId Job ID to recover checkpoints for
* @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to
retain
* @param userClassLoader User code class loader of the job
+ * @param sharedStateRegistryFactory Simple factory to produce {@link
SharedStateRegistry}
+ * objects.
+ * @param ioExecutor Executor used to run (async) deletes.
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
- JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader
userClassLoader)
+ JobID jobId,
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader,
+ SharedStateRegistryFactory sharedStateRegistryFactory,
Review comment:
I think it could also be passed as a constructor parameter to
`CheckpointRecoveryFactory` implementations, but such a factory reduces
coupling (as opposed to directly constructing the registry inside
`CheckpointRecoveryFactory`).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1520,24 +1509,6 @@ private OptionalLong
restoreLatestCheckpointedStateInternal(
throw new IllegalStateException("CheckpointCoordinator is shut
down");
}
- // We create a new shared state registry object, so that all
pending async disposal
- // requests from previous runs will go against the old object
(were they can do no
- // harm). This must happen under the checkpoint lock.
- sharedStateRegistry.close();
Review comment:
Is this call now missing and the old registry isn't closed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]