rgsriram commented on code in PR #438:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1035964102
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
flinkService.listJobs());
}
+ @Test
+ public void testCancelStatelessSessionJob() throws Exception {
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+ reconciler.reconcile(sessionJob, readyContext);
+ assertEquals(1, flinkService.listJobs().size());
+ verifyAndSetRunningJobsToStatus(
+ sessionJob,
+ JobState.RUNNING,
+ org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ null,
+ flinkService.listJobs());
+
+ var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
+ var jobConfig = flinkService.listJobs().get(0).f2;
+
+ // JobID must be equal.
+ assertEquals(
+ statelessSessionJob.getStatus().getJobStatus().getJobId(),
+ flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+ // Case RUNNING -> FINISHED
+
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+ statelessSessionJob.getSpec().getJob().setParallelism(2);
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ reconciler.reconcile(statelessSessionJob, readyContext);
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.FINISHED,
+ flinkService.listJobs().get(0).f1.getJobState());
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case FAILING -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.FINISHED,
+ flinkService.listJobs().get(0).f1.getJobState());
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case CANCELLING -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.CANCELLING.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case RESTARTING -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case FAILED -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FAILED.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case FINISHED -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+ // Case CANCELLED -> FINISHED
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.CANCELED.name());
+ flinkService.cancelSessionJob(statelessSessionJob,
UpgradeMode.STATELESS, jobConfig);
+ verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+ }
+
+ @Test
+ public void testCancelStatefulSessionJob() throws Exception {
Review Comment:
Thanks for your valuable time in reviewing :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]