gyfora commented on code in PR #821:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667645251
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -266,19 +304,31 @@ protected void restoreJob(
Optional<String> savepointOpt = Optional.empty();
if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
- savepointOpt =
- Optional.ofNullable(
- ctx.getResource()
- .getStatus()
- .getJobStatus()
- .getSavepointInfo()
- .getLastSavepoint())
- .flatMap(s ->
Optional.ofNullable(s.getLocation()));
+ if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+ ctx.getOperatorConfig(), deployConfig)) {
+ savepointOpt =
getLatestSavepointPathFromFlinkStateSnapshots(ctx);
+ } else {
+ savepointOpt =
+ Optional.ofNullable(
+ ctx.getResource()
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint())
+ .flatMap(s ->
Optional.ofNullable(s.getLocation()));
Review Comment:
Actually I think the cleanest / simplest thing to do would be to add a new
status field like:
```
snapshotReference / upgradeSnapshotReference
```
This should be updated during SAVEPOINT deployments (when we start from
savepoint/checkpoint), after savepoint cancellation or when observing the last
state of a terminal job. (basically every time we currently update the
lastSavepoint for upgrade purposes
)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]