rgsriram commented on code in PR #438:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1035963451


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
                 flinkService.listJobs());
     }
 
+    @Test
+    public void testCancelStatelessSessionJob() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        reconciler.reconcile(sessionJob, readyContext);
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob,
+                JobState.RUNNING,
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                null,
+                flinkService.listJobs());
+
+        var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
+        var jobConfig = flinkService.listJobs().get(0).f2;
+
+        // JobID must be equal.
+        assertEquals(
+                statelessSessionJob.getStatus().getJobStatus().getJobId(),
+                flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+        // Case RUNNING -> FINISHED
+        
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        statelessSessionJob.getSpec().getJob().setParallelism(2);
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        reconciler.reconcile(statelessSessionJob, readyContext);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.CANCELLING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case RESTARTING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FAILED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FINISHED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setState(org.apache.flink.api.common.JobStatus.CANCELED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, 
UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+    }
+
+    @Test
+    public void testCancelStatefulSessionJob() throws Exception {

Review Comment:
   Hi Gabor, 
   
   This is expected for a stateless session job like in the application mode. 
In application mode, it will throw an error if it can not gracefully shut down 
the stateless job. By any chance did you mean that?  I also thought about it 
but it wasn't there initially. I think it should be added.
   
   ```
             case STATELESS:
                       if 
(ReconciliationUtils.isJobRunning(deployment.getStatus())) {
                           LOG.info("Job is running, cancelling job.");
                           try {
                               clusterClient
                                       
.cancel(Preconditions.checkNotNull(jobId))
                                       .get(
                                               configManager
                                                       
.getOperatorConfiguration()
                                                       
.getFlinkCancelJobTimeout()
                                                       .toSeconds(),
                                               TimeUnit.SECONDS);
                               LOG.info("Job successfully cancelled.");
                           } catch (Exception e) {
                               LOG.error("Could not shut down cluster 
gracefully, deleting...", e);
                           }
                       }
                       deleteClusterDeployment(deployment.getMetadata(), 
deploymentStatus, true);
                       break;
   ```
   
   Stateful job is failing as expected.
   
   <img width="1668" alt="Screenshot 2022-11-30 at 6 39 54 PM" 
src="https://user-images.githubusercontent.com/7067975/204805994-0bebe2fd-daba-4c2f-a922-c716f50a98a3.png";>
   
   And I agree with you on splitting the test cases into multiple small 
functions.
   
   ```
   RUNNING JOB -> Savepoint Cancel -> OK
   TERMINAL JOB -> Savepoint Cancel -> OK
   NOT RUNNING or TERMINAL -> Savepoint Cancel -> EXCEPTION
   
   RUNNING JOB -> Stateless Cancel -> OK
   TERMINAL JOB -> Stateless Cancel -> OK
   NOT RUNNING or TERMINAL -> Stateless Cancel -> OK
   ```
    Above is the core logic for the test cases. Please feel free to correct me 
if I am wrong in any place.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to