rgsriram commented on code in PR #438:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1023511476
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -362,11 +362,21 @@ public void cancelSessionJob(
FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration
conf)
throws Exception {
- var jobStatus = sessionJob.getStatus().getJobStatus();
+ var sessionJobStatus = sessionJob.getStatus();
+ var jobStatus = sessionJobStatus.getJobStatus();
var jobIdString = jobStatus.getJobId();
Preconditions.checkNotNull(jobIdString, "The job to be suspend should
not be null");
var jobId = JobID.fromHexString(jobIdString);
Optional<String> savepointOpt = Optional.empty();
+
+ if (ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
+ LOG.info("Job is already in terminal state.");
Review Comment:
I found the following behaviour. When the job is in Terminal State
(COMPLETED/FINISHED/FAILED). There is no exception. It goes to a terminal state
and continues to cancel the job. Ideally which is not expected. I checked
behaviour with both STATELESS and STATEFULL jobs. I attached only the STATELESS
job behaviour in the below image for reference.
<img width="1661" alt="Screenshot 2022-11-16 at 10 09 08 AM"
src="https://user-images.githubusercontent.com/7067975/202087450-061f087a-5a21-498e-a2c7-38e24bc108ad.png">
Coming to the actual error reported in the ticket "Job not found". It
happens only in case the job is not present in the session jobs list. I tested
by the following test case.
```
@Test
public void testCancelTheDeletedSessionJob() throws Exception {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
sessionJob,
JobState.RUNNING,
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
null,
flinkService.listJobs());
var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
var jobConfig = flinkService.listJobs().get(0).f2;
// JobID must be equal.
assertEquals(
statelessSessionJob.getStatus().getJobStatus().getJobId(),
flinkService.listJobs().get(0).f1.getJobId().toHexString());
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessSessionJob.getSpec().getJob().setParallelism(2);
// job suspended first
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(
org.apache.flink.api.common.JobStatus.FINISHED,
flinkService.listJobs().get(0).f1.getJobState());
verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
// Test - cancel the deleted session job
flinkService.listJobs().remove(0);
flinkService.cancelSessionJob(
statelessSessionJob, UpgradeMode.STATELESS,
jobConfig);
}
```
I got the below exception.
<img width="1657" alt="Screenshot 2022-11-16 at 9 48 17 AM"
src="https://user-images.githubusercontent.com/7067975/202088349-e7831c43-fffa-4e14-8a43-ee1c90921418.png">
IMO - The exception is correct because the job itself is not present. But I
think we can find the job if it exists before cancelling it. If not exists we
need to throw an error but it gets cut before the execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]