chenyuzhi459 commented on code in PR #855:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -289,6 +289,8 @@ public boolean
reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Except
LOG.info("Stopping failed Flink job...");
cleanupAfterFailedJob(ctx);
status.setError(null);
+ ReconciliationUtils.updateStatusForDeployedSpec(
+ ctx.getResource(),
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);
Review Comment:
Assume a flink deployment is submitted to the flink-kubernetes-operator for
the first time with the following settings
```
spec.job.upgradeMode=savepoint
spec.job.initialSavepointPath=null
spec.flinkConfiguration.execution.checkpointing.interval=60s
```
Then I will share the startup and failover process of
flink-kubernetes-operator based on my understanding:
1. At the first reconcile, in method
[AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224)
, the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS
(this will not be updated synchronously to origin deployment in k8s, which
means that the origin deployment's spec.job.upgradeMode is still savepoint)
because `spec.job.initialSavepointPath` is empty, and will be serialized into
`status.reconciliationStatus.lastReconciledSpec` (this step will be
synchronously updated to the origin deployment in k8s, I haven't studied why
yet will happen)
2. After running for a period of time, the deployment may encounters a
problem and exit with failed status. The operator will save the latest
checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin
deployment in the method `SnapshotObserver.observeSavepointStatus`.
3. Then in the method
[AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315),
the lastReconciledSpec of the origin deployment will be read as specToRecover
variable and passed to the method
[AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260).
In the method `AbstractJobReconciler.restore`, it will determine whether to
recover from lastSavepoint based on whether the `spec.job.upgradeMode` of
specToRecover variable is STATELESS . Before fixed, the updateMode here is
obviously STATELESS.
Therefore, in the faiover scenes, I think just serializing the origin
deployment's `spec.job.upgradeMode=SAVEPOINT` to `
status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve
this problem.
I don’t know if there is something wrong with my understanding. If so, I
hope you can correct me. Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]