Aitozi commented on code in PR #283:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/283#discussion_r908605561
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -352,22 +382,33 @@ ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(
return ErrorStatusUpdateControl.noStatusUpdate();
}
- public static Long getUpgradeTargetGeneration(FlinkDeployment deployment) {
+ /**
+ * Get spec generation for the current in progress upgrade.
+ *
+ * @param resource Flink resource.
+ * @return The spec generation for the upgrade.
+ */
+ public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?>
resource) {
var lastSpecWithMeta =
- deployment
- .getStatus()
+ resource.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta();
if (lastSpecWithMeta == null || lastSpecWithMeta.f1 == null) {
// For first deployments and when migrating from before this
feature simply return
// current generation
- return deployment.getMetadata().getGeneration();
+ return resource.getMetadata().getGeneration();
}
return
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
Review Comment:
Now the specWithMeta's meta is a `ObjectNode`, which will make it no schema
and difficult to use, do you think we need to improve it here ?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -279,21 +279,35 @@ private void checkIfAlreadyUpgraded(FlinkDeployment
flinkDep, Context context) {
Long upgradeTargetGeneration =
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
+ Long currentSpecGeneration =
flinkDep.getMetadata().getGeneration();
+
if (deployedGeneration.equals(upgradeTargetGeneration)) {
- logger.info(
- "Last reconciled generation is already
deployed, setting reconciliation status to "
- + ReconciliationState.DEPLOYED);
+ logger.info("Pending upgrade is already deployed,
updating status.");
var firstDeploy =
status.getReconciliationStatus().getLastReconciledSpec() == null;
- var conf =
- firstDeploy
- ? configManager.getDeployConfig(
- flinkDep.getMetadata(),
flinkDep.getSpec())
- :
configManager.getObserveConfig(flinkDep);
+ if (firstDeploy) {
+
ReconciliationUtils.updateForSpecReconciliationSuccess(
+ flinkDep,
+ JobState.RUNNING,
+ configManager.getDeployConfig(
+ flinkDep.getMetadata(),
flinkDep.getSpec()));
+ } else {
+
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
+ }
+
+ status.getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+ } else if
(deployedGeneration.equals(currentSpecGeneration)) {
Review Comment:
I think we have a problem here, we can only do the thing when the
deployedGeneration equal to the `currentSpecGeneration` or the
`upgradeTargetGeneration`. If they are not equal, then we do not know how to
bring the job do the desired version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]