mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667707042


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -266,19 +304,31 @@ protected void restoreJob(
         Optional<String> savepointOpt = Optional.empty();
 
         if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
-            savepointOpt =
-                    Optional.ofNullable(
-                                    ctx.getResource()
-                                            .getStatus()
-                                            .getJobStatus()
-                                            .getSavepointInfo()
-                                            .getLastSavepoint())
-                            .flatMap(s -> 
Optional.ofNullable(s.getLocation()));
+            if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+                    ctx.getOperatorConfig(), deployConfig)) {
+                savepointOpt = 
getLatestSavepointPathFromFlinkStateSnapshots(ctx);
+            } else {
+                savepointOpt =
+                        Optional.ofNullable(
+                                        ctx.getResource()
+                                                .getStatus()
+                                                .getJobStatus()
+                                                .getSavepointInfo()
+                                                .getLastSavepoint())
+                                .flatMap(s -> 
Optional.ofNullable(s.getLocation()));

Review Comment:
   Thank you for detailing this problem so well, it makes perfect sense, I have 
overlooked this part of the operator.
   
   I think I could tackle this problem with a new status field as you have 
proposed called `upgradeSnapshotReference` in the job status, but this still 
leaves an important feature out which was covered before: Manual savepoints 
triggered by creating a new `FlinkStateSnapshot` resource, and periodic 
savepoints that create new `FlinkStateSnapshot` resources will not update this 
new field `upgradeSnapshotReference` in FlinkDeployment/FlinkSessionJob 
statuses.
   
   I see 3 solutions:
   - Update FlinkDeployment/FlinkSessionJob status from the snapshot 
controller, I agree that this is not an optimal pattern, and I would avoid it.
   - List all `FlinkStateSnapshot` resources when re-deploying a Flink job and 
we check `upgradeSnapshotReference`. We could compare the time the snapshots 
were taken and use the most recent. This might lead to confusion because e.g. 
starting a suspended job might not use the savepoint we can see in the status.
   - From the FlinkDeployment/FlinkSessionJob controllers check the status of 
`FlinkStateSnapshot` resources with the help of `InformerEventSource`, not sure 
how viable this is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to