rgsriram commented on code in PR #438:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1023511476
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -362,11 +362,21 @@ public void cancelSessionJob(
FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration
conf)
throws Exception {
- var jobStatus = sessionJob.getStatus().getJobStatus();
+ var sessionJobStatus = sessionJob.getStatus();
+ var jobStatus = sessionJobStatus.getJobStatus();
var jobIdString = jobStatus.getJobId();
Preconditions.checkNotNull(jobIdString, "The job to be suspend should
not be null");
var jobId = JobID.fromHexString(jobIdString);
Optional<String> savepointOpt = Optional.empty();
+
+ if (ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
+ LOG.info("Job is already in terminal state.");
Review Comment:
I found the following behaviour. When the job is in Terminal State
(COMPLETED/FINISHED/FAILED). There is no exception. It goes to a terminal state
and continues to cancel the job. Ideally, which is not expected. I checked
behaviour with both STATELESS and STATEFUL jobs. I attached only the STATELESS
job behaviour in the below image for reference. BTW, I used print statements to
find both job ids are equal. I have added those as `assert` checks in the test
case.
<img width="1661" alt="Screenshot 2022-11-16 at 10 09 08 AM"
src="https://user-images.githubusercontent.com/7067975/202087450-061f087a-5a21-498e-a2c7-38e24bc108ad.png">
Coming to the actual error reported in the ticket "Job not found". It
happens only in the case when the job is not present in the session jobs list.
I have tested using the following test case.
```
@Test
public void testCancelTheDeletedSessionJob() throws Exception {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
sessionJob,
JobState.RUNNING,
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
null,
flinkService.listJobs());
var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
var jobConfig = flinkService.listJobs().get(0).f2;
// JobID must be equal.
assertEquals(
statelessSessionJob.getStatus().getJobStatus().getJobId(),
flinkService.listJobs().get(0).f1.getJobId().toHexString());
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessSessionJob.getSpec().getJob().setParallelism(2);
// job suspended first
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(
org.apache.flink.api.common.JobStatus.FINISHED,
flinkService.listJobs().get(0).f1.getJobState());
verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
// Test - cancel the deleted session job
flinkService.listJobs().remove(0);
flinkService.cancelSessionJob(
statelessSessionJob, UpgradeMode.STATELESS,
jobConfig);
}
```
I got the below exception. I have not included this test case. I felt it is
not required.
<img width="1657" alt="Screenshot 2022-11-16 at 9 48 17 AM"
src="https://user-images.githubusercontent.com/7067975/202088349-e7831c43-fffa-4e14-8a43-ee1c90921418.png">
IMO - The exception is correct because the job itself is not present. But I
think we can find the job if it exists before cancelling it. If the job does
not exist we can throw an error, it cuts the actual execution logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]