XComp commented on a change in pull request #17485:
URL: https://github.com/apache/flink/pull/17485#discussion_r786874117
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
##########
@@ -78,21 +84,46 @@ public CompletedCheckpointStore
createRecoveredCompletedCheckpointStore(
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor)
throws Exception {
+ final String configMapName = getConfigMapNameFunction.apply(jobID);
+ KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient,
configMapName, clusterId);
return KubernetesUtils.createCompletedCheckpointStore(
configuration,
kubeClient,
executor,
- getConfigMapNameFunction.apply(jobID),
+ configMapName,
lockIdentity,
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
ioExecutor);
}
@Override
- public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) {
- return new KubernetesCheckpointIDCounter(
- kubeClient, getConfigMapNameFunction.apply(jobID),
lockIdentity);
+ public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws
Exception {
Review comment:
To me, it would feel more natural to have a subclass of
`KubernetesCheckpointRecoveryFactory` that takes care of the ConfigMap
creation. But I don't have a strong argument towards refactoring this code
because we wouldn't gain much from such a refactoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]