wangyang0918 commented on a change in pull request #48:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/48#discussion_r822567083



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -169,16 +189,24 @@ private void printCancelLogs(UpgradeMode upgradeMode, 
String name) {
         }
     }
 
-    private Optional<String> cancelJob(
+    private Optional<String> suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
             throws Exception {
-        Optional<String> savepointOpt =
-                flinkService.cancelJob(
-                        
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
-                        upgradeMode,
-                        effectiveConfig);
+
+        Optional<String> savepointOpt = Optional.empty();
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            shutdown(flinkApp, effectiveConfig);
+        } else {
+            String jobIdString = 
flinkApp.getStatus().getJobStatus().getJobId();
+            savepointOpt =
+                    flinkService.cancelJob(
+                            jobIdString != null ? 
JobID.fromHexString(jobIdString) : null,

Review comment:
       It is not very good that the `jobId` could be `null` here. How about the 
following changes?
   ```
            Optional<String> savepointOpt = Optional.empty();
   
           // The job should be running if UpgradeMode is savepoint
           if (!isJobRunning(flinkApp)) {
               FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
               return savepointOpt;
           }
   
           final String jobIdString = 
flinkApp.getStatus().getJobStatus().getJobId();
           savepointOpt =
                   flinkService.cancelJob(
                           JobID.fromHexString(jobIdString), upgradeMode, 
effectiveConfig);
   ```

##########
File path: 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
##########
@@ -72,19 +75,26 @@ public static FlinkDeployment buildApplicationCluster() {
                         JobSpec.builder()
                                 .jarURI(SAMPLE_JAR)
                                 .parallelism(1)
+                                .upgradeMode(UpgradeMode.STATELESS)
                                 .state(JobState.RUNNING)
                                 .build());
         return deployment;
     }
 
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+        conf.put(

Review comment:
       We could use `HighAvailabilityOptions.HA_MODE` and 
`KubernetesHaServicesFactory.class.getCanonicalName()` here.

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -76,14 +78,15 @@ public JobReconciler(
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
-            return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
-                    flinkApp, operatorConfiguration);
+            return;
+        }
+
+        if (SavepointUtils.savepointInProgress(flinkApp)) {
+            return;

Review comment:
       It will be great if we have a log here. Then users will know clearly the 
reconciliation is delayed because of savepoint in progress.

##########
File path: 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -295,6 +302,113 @@ public void verifyStatelessUpgrade() {
         assertEquals(null, jobs.get(0).f0);
     }
 
+    @Test
+    public void testUpgradeNotReadyCluster() {
+        testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(), true);
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        testUpgradeNotReadyCluster(appCluster, false);
+    }
+
+    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean 
allowUpgrade) {
+        mockServer

Review comment:
       I am not sure whether we could use `@EnableKubernetesMockClient(crud = 
true)` for this test class just like `FlinkServiceTest`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to