gaborgsomogyi commented on code in PR #438:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1035860669


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
                 flinkService.listJobs());
     }
 
+    @Test

Review Comment:
   I think this can be compacted such a way but that said in my previous 
comment these are passing all the time no matter if your fix is there or not:
   ```
       @ParameterizedTest
       @EnumSource(org.apache.flink.api.common.JobStatus.class)
       public void 
testCancelStatelessSessionJob(org.apache.flink.api.common.JobStatus 
fromJobStatus) throws Exception {
           FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
   
           var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
   
           reconciler.reconcile(sessionJob, readyContext);
           assertEquals(1, flinkService.listJobs().size());
           verifyAndSetRunningJobsToStatus(
                   sessionJob,
                   JobState.RUNNING,
                   org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                   null,
                   flinkService.listJobs());
   
           var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
           var jobConfig = flinkService.listJobs().get(0).f2;
   
           // JobID must be equal.
           assertEquals(
                   statelessSessionJob.getStatus().getJobStatus().getJobId(),
                   flinkService.listJobs().get(0).f1.getJobId().toHexString());
   
           
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
           statelessSessionJob.getSpec().getJob().setParallelism(2);
           reconciler.reconcile(statelessSessionJob, readyContext);
   
           statelessSessionJob
                   .getStatus()
                   .getJobStatus()
                   .setState(fromJobStatus.name());
           flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
           assertEquals(
                   org.apache.flink.api.common.JobStatus.FINISHED,
                   flinkService.listJobs().get(0).f1.getJobState());
           verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
       }
   ```
   



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
                 flinkService.listJobs());
     }
 
+    @Test
+    public void testCancelStatelessSessionJob() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        reconciler.reconcile(sessionJob, readyContext);
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob,
+                JobState.RUNNING,
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                null,
+                flinkService.listJobs());
+
+        var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
+        var jobConfig = flinkService.listJobs().get(0).f2;
+
+        // JobID must be equal.
+        assertEquals(
+                statelessSessionJob.getStatus().getJobStatus().getJobId(),
+                flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+        // Case RUNNING -> FINISHED
+        
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        statelessSessionJob.getSpec().getJob().setParallelism(2);
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        reconciler.reconcile(statelessSessionJob, readyContext);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.CANCELLING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case RESTARTING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FAILED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FINISHED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.CANCELED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+    }
+
+    @Test
+    public void testCancelStatefulSessionJob() throws Exception {

Review Comment:
   * Here the same, test parts which are not expecting an exception are passing 
w/o the fix.
   * Please split the test per source state and don't create 10+ assertions 
within a single test.
   Either one can do parameterized test like I've suggested before or one can 
extract the common functionality into a function. An example can be found 
[here](https://github.com/apache/flink-kubernetes-operator/blob/80e5f3f4add50355de6964c1466ba558d46bc79d/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java#L86-L102).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -362,25 +362,31 @@ public void cancelSessionJob(
             FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
conf)
             throws Exception {
 
-        var jobStatus = sessionJob.getStatus().getJobStatus();
+        var sessionJobStatus = sessionJob.getStatus();
+        var jobStatus = sessionJobStatus.getJobStatus();
         var jobIdString = jobStatus.getJobId();
         Preconditions.checkNotNull(jobIdString, "The job to be suspend should 
not be null");
         var jobId = JobID.fromHexString(jobIdString);
         Optional<String> savepointOpt = Optional.empty();
+
+        LOG.debug("Current Job State, {}", jobStatus.getState());
+
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             final String clusterId = clusterClient.getClusterId();
             switch (upgradeMode) {
                 case STATELESS:
-                    LOG.info("Cancelling job.");
-                    clusterClient
-                            .cancel(jobId)
-                            .get(
-                                    configManager
-                                            .getOperatorConfiguration()
-                                            .getFlinkCancelJobTimeout()
-                                            .toSeconds(),
-                                    TimeUnit.SECONDS);
-                    LOG.info("Job successfully cancelled.");
+                    if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {

Review Comment:
   I've just removed this if condition and the tests are still passing. 
Something is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to