Copilot commented on code in PR #10075: URL: https://github.com/apache/seatunnel/pull/10075#discussion_r2601431377
########## docs/en/seatunnel-engine/rest-api-v2.md: ########## @@ -853,12 +853,22 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@" <details> <summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId if job stoped successfully.)</code></summary> Review Comment: Typo: "stoped" should be "stopped". ```suggestion <summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId if job stopped successfully.)</code></summary> ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java: ########## @@ -220,10 +220,45 @@ public void savepointJob() { updateJobState(JobStatus.DOING_SAVEPOINT); } + public void stopJob() { + JobStatus jobStatus = getJobStatus(); + if (jobStatus.isEndState()) { + log.warn("{} is in end state {}, can not be stop", jobFullName, jobStatus); + return; + } + + if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) { + // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need to be set directly to + // the 'CANCELLED' state because it has not yet started running + updateJobState(JobStatus.CANCELED); + } else if (jobStatus == JobStatus.DOING_SAVEPOINT) { + this.pipelineList.forEach(SubPlan::finishPipelineWithCheckpointFallback); + } else { + updateJobState(JobStatus.CANCELING); Review Comment: When force stopping a job that is not in DOING_SAVEPOINT state (line 237-238), the checkpoint coordinator should be cancelled before calling forceStopPipeline. In the DOING_SAVEPOINT branch (line 235), finishPipelineWithCheckpointFallback properly cancels the checkpoint coordinator via cancelCheckpointCoordinator(), but this step is missing in the else branch. Consider extracting the checkpoint cancellation logic similar to the cancelJob() method which calls cancelPipeline() that handles this internally through cancelCheckpointCoordinator(). ```suggestion updateJobState(JobStatus.CANCELING); this.pipelineList.forEach(SubPlan::cancelCheckpointCoordinator); ``` ########## seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java: ########## @@ -161,6 +161,82 @@ public void testInvocationFutureUseCompletableFutureExecutor() { instance.shutdown(); } + @Test + void testForceStopRunningJob() { + JobInformation jobInformation = + submitJob( + "CoordinatorServiceTest_testStopRunningJob", + "stream_fake_to_console.conf", + "test_stop_running_job"); + CoordinatorService coordinatorService = jobInformation.coordinatorService; + + await().atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals( + JobStatus.RUNNING, + coordinatorService.getJobStatus(jobInformation.jobId)); + JobMaster jobMaster = + coordinatorService.getJobMaster(jobInformation.jobId); + Assertions.assertNotNull(jobMaster); + Assertions.assertTrue( + jobMaster + .getRunningJobStateIMap() + .containsKey(jobInformation.jobId)); + }); + + coordinatorService.stopJob(jobInformation.jobId).join(); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals( + JobStatus.CANCELED, + coordinatorService.getJobStatus(jobInformation.jobId)); + }); + jobInformation.coordinatorService.clearCoordinatorService(); + jobInformation.coordinatorServiceTest.shutdown(); + } + + @Test + void testForceStopAbnormalSavepointJob() { + JobInformation jobInformation = + submitJob( + "CoordinatorServiceTest_testStopRunningJob", Review Comment: The test method name indicates testing "testStopRunningJob" in line 204, but the actual test method is named "testForceStopAbnormalSavepointJob". The string should be updated to match the test method name for consistency and clarity. ```suggestion "CoordinatorServiceTest_testForceStopAbnormalSavepointJob", ``` ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java: ########## @@ -503,6 +509,21 @@ public void restorePipeline() { } } + public void finishPipelineWithCheckpointFallback() { + if (jobMaster.getCheckpointManager() == null) { Review Comment: When checkpoint manager is null (line 513), the method returns early without actually stopping the pipeline. This means the force stop operation will be incomplete. Consider calling `forceStopPipeline()` before the early return to ensure the pipeline is properly stopped even when checkpoints are disabled. ```suggestion if (jobMaster.getCheckpointManager() == null) { forceStopPipeline(); ``` ########## seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java: ########## @@ -161,6 +161,82 @@ public void testInvocationFutureUseCompletableFutureExecutor() { instance.shutdown(); } + @Test + void testForceStopRunningJob() { + JobInformation jobInformation = + submitJob( + "CoordinatorServiceTest_testStopRunningJob", Review Comment: The test method name indicates testing "testStopRunningJob" in line 168, but the actual test method is named "testForceStopRunningJob". The string should be updated to match the test method name for consistency and clarity. ```suggestion "CoordinatorServiceTest_testForceStopRunningJob", ``` ########## docs/en/seatunnel-engine/rest-api-v1.md: ########## @@ -596,12 +596,22 @@ When we can't get the job info, the response will be: <details> <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> <code>(Returns jobId if job stoped successfully.)</code></summary> Review Comment: Typo: "stoped" should be "stopped". ```suggestion <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> <code>(Returns jobId if job stopped successfully.)</code></summary> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
