gyfora commented on code in PR #356:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/356#discussion_r959399419


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -98,12 +99,39 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
                                 
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                 && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-                && flinkService.isHaMetadataAvailable(deployConfig)
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
-            LOG.info(
-                    "Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
+
+            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+                if 
(deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {

Review Comment:
   Would be nice to encapsulate the logic inside this branch into a 
method/utility, there are quite a few things happening in there.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -98,12 +99,39 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
                                 
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                 && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-                && flinkService.isHaMetadataAvailable(deployConfig)
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
-            LOG.info(
-                    "Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
+
+            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+                if 
(deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
+                    // initial deployment failure, reset to allow for spec 
change to proceed
+                    flinkService.deleteClusterDeployment(
+                            deployment.getMetadata(), deployment.getStatus(), 
false);
+                    flinkService.waitForClusterShutdown(deployConfig);
+                    // in case the deployment succeeded between check and 
delete, fall through to
+                    // the upgrade path
+                    if (!flinkService.isHaMetadataAvailable(deployConfig)) {

Review Comment:
   I think there should be an `else` branch which returns LAST_STATE (if 
metadata was actually present after shutdown)



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -98,12 +99,39 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
                                 
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                 && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-                && flinkService.isHaMetadataAvailable(deployConfig)
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
-            LOG.info(
-                    "Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
+
+            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+                if 
(deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
+                    // initial deployment failure, reset to allow for spec 
change to proceed
+                    flinkService.deleteClusterDeployment(
+                            deployment.getMetadata(), deployment.getStatus(), 
false);
+                    flinkService.waitForClusterShutdown(deployConfig);
+                    // in case the deployment succeeded between check and 
delete, fall through to
+                    // the upgrade path
+                    if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+                        LOG.info(
+                                "Job never entered stable state. Clearing 
previous spec to reset for initial deploy");
+                        // TODO: lastSpecWithMeta.f1.isFirstDeployment() is 
false

Review Comment:
   Would be nice to clear up this TODO 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to