rgsriram commented on code in PR #438:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1023511476


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -362,11 +362,21 @@ public void cancelSessionJob(
             FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
conf)
             throws Exception {
 
-        var jobStatus = sessionJob.getStatus().getJobStatus();
+        var sessionJobStatus = sessionJob.getStatus();
+        var jobStatus = sessionJobStatus.getJobStatus();
         var jobIdString = jobStatus.getJobId();
         Preconditions.checkNotNull(jobIdString, "The job to be suspend should 
not be null");
         var jobId = JobID.fromHexString(jobIdString);
         Optional<String> savepointOpt = Optional.empty();
+
+        if (ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
+            LOG.info("Job is already in terminal state.");

Review Comment:
   I found the following behaviour. When the job is in Terminal State 
(COMPLETED/FINISHED/FAILED). There is no exception. It goes to a terminal state 
and continues to cancel the job. Ideally which is not expected. I checked 
behaviour with both STATELESS and STATEFULL jobs. I attached only the STATELESS 
job behaviour in the below image for reference. BTW, I used print statements to 
find both job ids are equal.  I have added those as `assert` checks in the test 
case.
   
   <img width="1661" alt="Screenshot 2022-11-16 at 10 09 08 AM" 
src="https://user-images.githubusercontent.com/7067975/202087450-061f087a-5a21-498e-a2c7-38e24bc108ad.png";>
   
   Coming to the actual error reported in the ticket "Job not found". It 
happens only in case the job is not present in the session jobs list. I tested 
by the following test case. 
   
   ```
   @Test
   public void testCancelTheDeletedSessionJob() throws Exception {
       FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
       var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
       reconciler.reconcile(sessionJob, readyContext);
       assertEquals(1, flinkService.listJobs().size());
       verifyAndSetRunningJobsToStatus(
               sessionJob,
               JobState.RUNNING,
               org.apache.flink.api.common.JobStatus.RECONCILING.name(),
               null,
               flinkService.listJobs());
   
       var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
       var jobConfig = flinkService.listJobs().get(0).f2;
   
       // JobID must be equal.
       assertEquals(
               statelessSessionJob.getStatus().getJobStatus().getJobId(),
               flinkService.listJobs().get(0).f1.getJobId().toHexString());
   
       
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
       statelessSessionJob.getSpec().getJob().setParallelism(2);
       // job suspended first
       reconciler.reconcile(statelessSessionJob, readyContext);
       assertEquals(
               org.apache.flink.api.common.JobStatus.FINISHED,
               flinkService.listJobs().get(0).f1.getJobState());
       verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
   
       // Test - cancel the deleted session job
       flinkService.listJobs().remove(0);
   
       flinkService.cancelSessionJob(
                               statelessSessionJob, UpgradeMode.STATELESS, 
jobConfig);
   }
   ```
   
   I got the below exception. I have not included this test case. I felt it is 
not required.
   
    
   <img width="1657" alt="Screenshot 2022-11-16 at 9 48 17 AM" 
src="https://user-images.githubusercontent.com/7067975/202088349-e7831c43-fffa-4e14-8a43-ee1c90921418.png";>
   
   IMO - The exception is correct because the job itself is not present. But I 
think we can find the job if it exists before cancelling it. If not exists we 
need to throw an error but it gets cut before the execution.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to