Aitozi commented on code in PR #283:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/283#discussion_r908605561


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -352,22 +382,33 @@ ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(
         return ErrorStatusUpdateControl.noStatusUpdate();
     }
 
-    public static Long getUpgradeTargetGeneration(FlinkDeployment deployment) {
+    /**
+     * Get spec generation for the current in progress upgrade.
+     *
+     * @param resource Flink resource.
+     * @return The spec generation for the upgrade.
+     */
+    public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> 
resource) {
         var lastSpecWithMeta =
-                deployment
-                        .getStatus()
+                resource.getStatus()
                         .getReconciliationStatus()
                         .deserializeLastReconciledSpecWithMeta();
 
         if (lastSpecWithMeta == null || lastSpecWithMeta.f1 == null) {
             // For first deployments and when migrating from before this 
feature simply return
             // current generation
-            return deployment.getMetadata().getGeneration();
+            return resource.getMetadata().getGeneration();
         }
 
         return 
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);

Review Comment:
   Now the specWithMeta's meta is a `ObjectNode`, which will make it no schema 
and difficult to use, do you think we need to improve it here ?  



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -279,21 +279,35 @@ private void checkIfAlreadyUpgraded(FlinkDeployment 
flinkDep, Context context) {
                     Long upgradeTargetGeneration =
                             
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
 
+                    Long currentSpecGeneration = 
flinkDep.getMetadata().getGeneration();
+
                     if (deployedGeneration.equals(upgradeTargetGeneration)) {
-                        logger.info(
-                                "Last reconciled generation is already 
deployed, setting reconciliation status to "
-                                        + ReconciliationState.DEPLOYED);
+                        logger.info("Pending upgrade is already deployed, 
updating status.");
 
                         var firstDeploy =
                                 
status.getReconciliationStatus().getLastReconciledSpec() == null;
-                        var conf =
-                                firstDeploy
-                                        ? configManager.getDeployConfig(
-                                                flinkDep.getMetadata(), 
flinkDep.getSpec())
-                                        : 
configManager.getObserveConfig(flinkDep);
+                        if (firstDeploy) {
+                            
ReconciliationUtils.updateForSpecReconciliationSuccess(
+                                    flinkDep,
+                                    JobState.RUNNING,
+                                    configManager.getDeployConfig(
+                                            flinkDep.getMetadata(), 
flinkDep.getSpec()));
+                        } else {
+                            
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
+                        }
+
+                        status.getJobStatus()
+                                
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+                    } else if 
(deployedGeneration.equals(currentSpecGeneration)) {

Review Comment:
   I think we have a problem here, we can only do the thing when the 
deployedGeneration equal to the `currentSpecGeneration` or the 
`upgradeTargetGeneration`. If they are not equal, then we do not know how to 
bring the job do the desired version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to