Aitozi commented on code in PR #283:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/283#discussion_r911568620


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -99,15 +103,20 @@ public final void reconcile(CR cr, Context ctx) throws 
Exception {
         // No further logic is required at this point.
         if (firstDeployment) {
             LOG.info("Deploying for the first time");
+
+            // Before we try to submit the job we record the current spec in 
the status so we can
+            // handle subsequent deployment and status update errors
+            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, 
deployConfig);
+            statusRecorder.patchAndCacheStatus(cr);
+
             deploy(
                     cr,
                     spec,
                     status,
                     deployConfig,
                     
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    cr, JobState.RUNNING, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);

Review Comment:
   If we throw exception in the deploy, the resource may not be truly deployed. 
So, I think the `firstDeployment` condition is not full. IMO, It should 
includes :
   
   - lastReconciledSpec == null
   - Or state in the `ReconciliationState.UPGRADING` and 
`JobManagerDeploymentStatus.Missing`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -262,8 +262,11 @@ private void onMissingDeployment(FlinkDeployment 
deployment) {
      * @param context Context for reconciliation.
      */
     private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context 
context) {
-        Optional<Deployment> depOpt = 
context.getSecondaryResource(Deployment.class);
         var status = flinkDep.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {

Review Comment:
   +1, and I think the condition should be improve Since the 
lastReconciliationStatus will be materialized before the `deploy` so there is 
the case it was not really finished. We should take this into account.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -106,19 +107,27 @@ protected void reconcileSpecChange(
             // We must record the upgrade mode used to the status later
             
currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
             cancelJob(resource, availableUpgradeMode.get(), observeConfig);
-            newState = JobState.SUSPENDED;
+            if (desiredJobState == JobState.RUNNING) {
+                
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);

Review Comment:
   Is this redundant, we do not do upgrade in this branch. This will be done in 
the `if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING)` I think.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java:
##########
@@ -133,7 +135,16 @@ protected void rollback(FlinkDeployment deployment, 
Context ctx, Configuration o
         FlinkDeploymentSpec rollbackSpec = 
reconciliationStatus.deserializeLastStableSpec();
         Configuration rollbackConfig =
                 configManager.getDeployConfig(deployment.getMetadata(), 
rollbackSpec);
-        upgradeSessionCluster(deployment, rollbackSpec, rollbackConfig);
+
+        deleteSessionCluster(deployment, observeConfig);
+        deploy(

Review Comment:
   Why in the rollback we do not have to record the target upgrade status ? I 
think the rollback should deploy the stable spec with the corresponding 
generation, WDYT 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java:
##########
@@ -133,7 +135,16 @@ protected void rollback(FlinkDeployment deployment, 
Context ctx, Configuration o
         FlinkDeploymentSpec rollbackSpec = 
reconciliationStatus.deserializeLastStableSpec();
         Configuration rollbackConfig =
                 configManager.getDeployConfig(deployment.getMetadata(), 
rollbackSpec);
-        upgradeSessionCluster(deployment, rollbackSpec, rollbackConfig);
+
+        deleteSessionCluster(deployment, observeConfig);
+        deploy(

Review Comment:
   I also found that the general flow of the deploy become the 
   - record the target upgrade/firstDeploy in status
   - deploy
   - update the deployed spec
   
   So I think we could include it in the `deploy` directly



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -99,15 +103,20 @@ public final void reconcile(CR cr, Context ctx) throws 
Exception {
         // No further logic is required at this point.
         if (firstDeployment) {
             LOG.info("Deploying for the first time");
+
+            // Before we try to submit the job we record the current spec in 
the status so we can
+            // handle subsequent deployment and status update errors
+            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, 
deployConfig);
+            statusRecorder.patchAndCacheStatus(cr);
+
             deploy(
                     cr,
                     spec,
                     status,
                     deployConfig,
                     
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    cr, JobState.RUNNING, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);

Review Comment:
   I'm second to @morhidi  to make the `isFirstDeploy` as a dedicated methods. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to